Skip to content

ccproxy.adapters.openai.streaming

ccproxy.adapters.openai.streaming

OpenAI streaming response formatting.

This module provides Server-Sent Events (SSE) formatting for OpenAI-compatible streaming responses.

OpenAISSEFormatter

Formats streaming responses to match OpenAI's SSE format.

format_data_event staticmethod

format_data_event(data)

Format a data event for OpenAI-compatible Server-Sent Events.

Parameters:

Name Type Description Default
data dict[str, Any]

Event data dictionary

required

Returns:

Type Description
str

Formatted SSE string

Source code in ccproxy/adapters/openai/streaming.py
@staticmethod
def format_data_event(data: dict[str, Any]) -> str:
    """Format a data event for OpenAI-compatible Server-Sent Events.

    Args:
        data: Event data dictionary

    Returns:
        Formatted SSE string
    """
    json_data = json.dumps(data, separators=(",", ":"))
    return f"data: {json_data}\n\n"

format_first_chunk staticmethod

format_first_chunk(
    message_id, model, created, role="assistant"
)

Format the first chunk with role and basic metadata.

Parameters:

Name Type Description Default
message_id str

Unique identifier for the completion

required
model str

Model name being used

required
created int

Unix timestamp when the completion was created

required
role str

Role of the assistant

'assistant'

Returns:

Type Description
str

Formatted SSE string

Source code in ccproxy/adapters/openai/streaming.py
@staticmethod
def format_first_chunk(
    message_id: str, model: str, created: int, role: str = "assistant"
) -> str:
    """Format the first chunk with role and basic metadata.

    Args:
        message_id: Unique identifier for the completion
        model: Model name being used
        created: Unix timestamp when the completion was created
        role: Role of the assistant

    Returns:
        Formatted SSE string
    """
    data = {
        "id": message_id,
        "object": "chat.completion.chunk",
        "created": created,
        "model": model,
        "choices": [
            {
                "index": 0,
                "delta": {"role": role},
                "logprobs": None,
                "finish_reason": None,
            }
        ],
    }
    return OpenAISSEFormatter.format_data_event(data)

format_content_chunk staticmethod

format_content_chunk(
    message_id, model, created, content, choice_index=0
)

Format a content chunk with text delta.

Parameters:

Name Type Description Default
message_id str

Unique identifier for the completion

required
model str

Model name being used

required
created int

Unix timestamp when the completion was created

required
content str

Text content to include in the delta

required
choice_index int

Index of the choice (usually 0)

0

Returns:

Type Description
str

Formatted SSE string

Source code in ccproxy/adapters/openai/streaming.py
@staticmethod
def format_content_chunk(
    message_id: str, model: str, created: int, content: str, choice_index: int = 0
) -> str:
    """Format a content chunk with text delta.

    Args:
        message_id: Unique identifier for the completion
        model: Model name being used
        created: Unix timestamp when the completion was created
        content: Text content to include in the delta
        choice_index: Index of the choice (usually 0)

    Returns:
        Formatted SSE string
    """
    data = {
        "id": message_id,
        "object": "chat.completion.chunk",
        "created": created,
        "model": model,
        "choices": [
            {
                "index": choice_index,
                "delta": {"content": content},
                "logprobs": None,
                "finish_reason": None,
            }
        ],
    }
    return OpenAISSEFormatter.format_data_event(data)

format_tool_call_chunk staticmethod

format_tool_call_chunk(
    message_id,
    model,
    created,
    tool_call_id,
    function_name=None,
    function_arguments=None,
    tool_call_index=0,
    choice_index=0,
)

Format a tool call chunk.

Parameters:

Name Type Description Default
message_id str

Unique identifier for the completion

required
model str

Model name being used

required
created int

Unix timestamp when the completion was created

required
tool_call_id str

ID of the tool call

required
function_name str | None

Name of the function being called

None
function_arguments str | None

Arguments for the function

None
tool_call_index int

Index of the tool call

0
choice_index int

Index of the choice (usually 0)

0

Returns:

Type Description
str

Formatted SSE string

Source code in ccproxy/adapters/openai/streaming.py
@staticmethod
def format_tool_call_chunk(
    message_id: str,
    model: str,
    created: int,
    tool_call_id: str,
    function_name: str | None = None,
    function_arguments: str | None = None,
    tool_call_index: int = 0,
    choice_index: int = 0,
) -> str:
    """Format a tool call chunk.

    Args:
        message_id: Unique identifier for the completion
        model: Model name being used
        created: Unix timestamp when the completion was created
        tool_call_id: ID of the tool call
        function_name: Name of the function being called
        function_arguments: Arguments for the function
        tool_call_index: Index of the tool call
        choice_index: Index of the choice (usually 0)

    Returns:
        Formatted SSE string
    """
    tool_call: dict[str, Any] = {
        "index": tool_call_index,
        "id": tool_call_id,
        "type": "function",
        "function": {},
    }

    if function_name is not None:
        tool_call["function"]["name"] = function_name

    if function_arguments is not None:
        tool_call["function"]["arguments"] = function_arguments

    data = {
        "id": message_id,
        "object": "chat.completion.chunk",
        "created": created,
        "model": model,
        "choices": [
            {
                "index": choice_index,
                "delta": {"tool_calls": [tool_call]},
                "logprobs": None,
                "finish_reason": None,
            }
        ],
    }
    return OpenAISSEFormatter.format_data_event(data)

format_final_chunk staticmethod

format_final_chunk(
    message_id,
    model,
    created,
    finish_reason="stop",
    choice_index=0,
    usage=None,
)

Format the final chunk with finish_reason.

Parameters:

Name Type Description Default
message_id str

Unique identifier for the completion

required
model str

Model name being used

required
created int

Unix timestamp when the completion was created

required
finish_reason str

Reason for completion (stop, length, tool_calls, etc.)

'stop'
choice_index int

Index of the choice (usually 0)

0
usage dict[str, int] | None

Optional usage information to include

None

Returns:

Type Description
str

Formatted SSE string

Source code in ccproxy/adapters/openai/streaming.py
@staticmethod
def format_final_chunk(
    message_id: str,
    model: str,
    created: int,
    finish_reason: str = "stop",
    choice_index: int = 0,
    usage: dict[str, int] | None = None,
) -> str:
    """Format the final chunk with finish_reason.

    Args:
        message_id: Unique identifier for the completion
        model: Model name being used
        created: Unix timestamp when the completion was created
        finish_reason: Reason for completion (stop, length, tool_calls, etc.)
        choice_index: Index of the choice (usually 0)
        usage: Optional usage information to include

    Returns:
        Formatted SSE string
    """
    data = {
        "id": message_id,
        "object": "chat.completion.chunk",
        "created": created,
        "model": model,
        "choices": [
            {
                "index": choice_index,
                "delta": {},
                "logprobs": None,
                "finish_reason": finish_reason,
            }
        ],
    }

    # Add usage if provided
    if usage:
        data["usage"] = usage

    return OpenAISSEFormatter.format_data_event(data)

format_error_chunk staticmethod

format_error_chunk(
    message_id, model, created, error_type, error_message
)

Format an error chunk.

Parameters:

Name Type Description Default
message_id str

Unique identifier for the completion

required
model str

Model name being used

required
created int

Unix timestamp when the completion was created

required
error_type str

Type of error

required
error_message str

Error message

required

Returns:

Type Description
str

Formatted SSE string

Source code in ccproxy/adapters/openai/streaming.py
@staticmethod
def format_error_chunk(
    message_id: str, model: str, created: int, error_type: str, error_message: str
) -> str:
    """Format an error chunk.

    Args:
        message_id: Unique identifier for the completion
        model: Model name being used
        created: Unix timestamp when the completion was created
        error_type: Type of error
        error_message: Error message

    Returns:
        Formatted SSE string
    """
    data = {
        "id": message_id,
        "object": "chat.completion.chunk",
        "created": created,
        "model": model,
        "choices": [
            {"index": 0, "delta": {}, "logprobs": None, "finish_reason": "error"}
        ],
        "error": {"type": error_type, "message": error_message},
    }
    return OpenAISSEFormatter.format_data_event(data)

format_done staticmethod

format_done()

Format the final DONE event.

Returns:

Type Description
str

Formatted SSE termination string

Source code in ccproxy/adapters/openai/streaming.py
@staticmethod
def format_done() -> str:
    """Format the final DONE event.

    Returns:
        Formatted SSE termination string
    """
    return "data: [DONE]\n\n"

OpenAIStreamProcessor

OpenAIStreamProcessor(
    message_id=None,
    model="claude-3-5-sonnet-20241022",
    created=None,
    enable_usage=True,
    enable_tool_calls=True,
    enable_text_chunking=True,
    chunk_size_words=3,
)

Processes Anthropic/Claude streaming responses into OpenAI format.

Parameters:

Name Type Description Default
message_id str | None

Response ID, generated if not provided

None
model str

Model name for responses

'claude-3-5-sonnet-20241022'
created int | None

Creation timestamp, current time if not provided

None
enable_usage bool

Whether to include usage information

True
enable_tool_calls bool

Whether to process tool calls

True
enable_text_chunking bool

Whether to chunk text content

True
chunk_size_words int

Number of words per text chunk

3
Source code in ccproxy/adapters/openai/streaming.py
def __init__(
    self,
    message_id: str | None = None,
    model: str = "claude-3-5-sonnet-20241022",
    created: int | None = None,
    enable_usage: bool = True,
    enable_tool_calls: bool = True,
    enable_text_chunking: bool = True,
    chunk_size_words: int = 3,
):
    """Initialize the stream processor.

    Args:
        message_id: Response ID, generated if not provided
        model: Model name for responses
        created: Creation timestamp, current time if not provided
        enable_usage: Whether to include usage information
        enable_tool_calls: Whether to process tool calls
        enable_text_chunking: Whether to chunk text content
        chunk_size_words: Number of words per text chunk
    """
    self.message_id = message_id or generate_openai_response_id()
    self.model = model
    self.created = created or int(time.time())
    self.enable_usage = enable_usage
    self.enable_tool_calls = enable_tool_calls
    self.enable_text_chunking = enable_text_chunking
    self.chunk_size_words = chunk_size_words
    self.formatter = OpenAISSEFormatter()

    # State tracking
    self.role_sent = False
    self.accumulated_content = ""
    self.tool_calls: dict[str, dict[str, Any]] = {}
    self.usage_info: dict[str, int] | None = None
    # Thinking block tracking
    self.current_thinking_text = ""
    self.current_thinking_signature: str | None = None
    self.thinking_block_active = False

process_stream async

process_stream(claude_stream)

Process a Claude/Anthropic stream into OpenAI format.

Parameters:

Name Type Description Default
claude_stream AsyncIterator[dict[str, Any]]

Async iterator of Claude response chunks

required

Yields:

Type Description
AsyncIterator[str]

OpenAI-formatted SSE strings

Source code in ccproxy/adapters/openai/streaming.py
async def process_stream(
    self, claude_stream: AsyncIterator[dict[str, Any]]
) -> AsyncIterator[str]:
    """Process a Claude/Anthropic stream into OpenAI format.

    Args:
        claude_stream: Async iterator of Claude response chunks

    Yields:
        OpenAI-formatted SSE strings
    """
    try:
        async for chunk in claude_stream:
            async for sse_chunk in self._process_chunk(chunk):
                yield sse_chunk

        # Send final chunk
        if self.usage_info and self.enable_usage:
            yield self.formatter.format_final_chunk(
                self.message_id,
                self.model,
                self.created,
                finish_reason="stop",
                usage=self.usage_info,
            )
        else:
            yield self.formatter.format_final_chunk(
                self.message_id, self.model, self.created, finish_reason="stop"
            )

        # Send DONE event
        yield self.formatter.format_done()

    except Exception as e:
        # Send error chunk
        yield self.formatter.format_error_chunk(
            self.message_id, self.model, self.created, "error", str(e)
        )
        yield self.formatter.format_done()