Skip to content

ccproxy.services.adapters.chat_accumulator

ccproxy.services.adapters.chat_accumulator

ChatCompletion accumulator for OpenAI streaming format.

ChatCompletionAccumulator

ChatCompletionAccumulator()

Accumulator for OpenAI ChatCompletion streaming format.

Handles partial tool calls and other streaming data by accumulating chunks until complete objects are ready for validation.

Follows the OpenAI SDK ChatCompletionStreamManager pattern.

Source code in ccproxy/services/adapters/chat_accumulator.py
def __init__(self) -> None:
    self._accumulated: dict[str, Any] = {}
    self._done_tool_calls: set[int] = set()
    self._current_tool_call_index: int | None = None

accumulate_chunk

accumulate_chunk(chunk)

Accumulate a streaming chunk and return complete object if ready.

Parameters:

Name Type Description Default
chunk dict[str, Any]

The incoming stream chunk data

required

Returns:

Type Description
dict[str, Any] | None

None if accumulation is ongoing, or the complete object when ready

dict[str, Any] | None

for validation

Source code in ccproxy/services/adapters/chat_accumulator.py
def accumulate_chunk(self, chunk: dict[str, Any]) -> dict[str, Any] | None:
    """Accumulate a streaming chunk and return complete object if ready.

    Args:
        chunk: The incoming stream chunk data

    Returns:
        None if accumulation is ongoing, or the complete object when ready
        for validation
    """
    # For chunks without tool calls, return immediately UNLESS we have accumulated state
    # (in which case this might be a finish_reason chunk)
    if not self._has_tool_calls(chunk) and not self._accumulated:
        return chunk

    # For the first chunk, copy the base structure
    if not self._accumulated:
        self._accumulated = copy.deepcopy(chunk)
    else:
        # For subsequent chunks, preserve base fields and only accumulate deltas
        base_fields = {"id", "object", "created", "model"}
        chunk_copy = copy.deepcopy(chunk)

        # Remove base fields from chunk_copy to avoid concatenation
        for field in base_fields:
            if field in chunk_copy:
                del chunk_copy[field]

        # Use accumulate_delta for the remaining fields (choices, etc.)
        self._accumulated = accumulate_delta(self._accumulated, chunk_copy)

    # Track tool call progress if present
    if self._has_tool_calls(chunk):
        self._track_tool_call_progress(chunk)

    # Don't validate if we have incomplete tool calls
    if self._has_incomplete_tool_calls():
        return None  # Continue accumulating

    # Return a copy for validation if chunk seems complete
    if self._should_emit_chunk(chunk):
        return copy.deepcopy(self._accumulated)

    # Continue accumulating
    return None

reset

reset()

Reset accumulator state for next message.

Source code in ccproxy/services/adapters/chat_accumulator.py
def reset(self) -> None:
    """Reset accumulator state for next message."""
    self._accumulated.clear()
    self._done_tool_calls.clear()
    self._current_tool_call_index = None