Skip to content

ccproxy.llms.streaming.processors

ccproxy.llms.streaming.processors

Stream processing utilities for converting between different streaming formats.

This module provides stream processors that convert between different LLM streaming response formats (e.g., Anthropic to OpenAI, OpenAI to Anthropic).

AnthropicStreamProcessor

AnthropicStreamProcessor(
    model="claude-3-5-sonnet-20241022",
)

Processes OpenAI streaming data into Anthropic SSE format.

Parameters:

Name Type Description Default
model str

Model name for responses

'claude-3-5-sonnet-20241022'
Source code in ccproxy/llms/streaming/processors.py
def __init__(self, model: str = "claude-3-5-sonnet-20241022"):
    """Initialize the stream processor.

    Args:
        model: Model name for responses
    """
    self.model = model
    self.formatter = AnthropicSSEFormatter()

process_stream async

process_stream(stream)

Process OpenAI-format streaming data into Anthropic SSE format.

Parameters:

Name Type Description Default
stream AsyncIterator[dict[str, Any]]

Async iterator of OpenAI-style response chunks

required

Yields:

Type Description
AsyncIterator[str]

Anthropic-formatted SSE strings with proper event: lines

Source code in ccproxy/llms/streaming/processors.py
async def process_stream(
    self, stream: AsyncIterator[dict[str, Any]]
) -> AsyncIterator[str]:
    """Process OpenAI-format streaming data into Anthropic SSE format.

    Args:
        stream: Async iterator of OpenAI-style response chunks

    Yields:
        Anthropic-formatted SSE strings with proper event: lines
    """
    message_started = False
    content_block_started = False

    async for chunk in stream:
        if not isinstance(chunk, dict):
            continue

        chunk_type = chunk.get("type")

        if chunk_type == "message_start":
            if not message_started:
                yield self.formatter.format_event("message_start", chunk)
                message_started = True

        elif chunk_type == "content_block_start":
            if not content_block_started:
                yield self.formatter.format_event("content_block_start", chunk)
                content_block_started = True

        elif chunk_type == "content_block_delta":
            yield self.formatter.format_event("content_block_delta", chunk)

        elif chunk_type == "ping":
            yield self.formatter.format_ping()

        elif chunk_type == "content_block_stop":
            yield self.formatter.format_event("content_block_stop", chunk)

        elif chunk_type == "message_delta":
            yield self.formatter.format_event("message_delta", chunk)

        elif chunk_type == "message_stop":
            yield self.formatter.format_event("message_stop", chunk)
            break

OpenAIStreamProcessor

OpenAIStreamProcessor(
    message_id=None,
    model="claude-3-5-sonnet-20241022",
    created=None,
    enable_usage=True,
    enable_tool_calls=True,
    enable_thinking_serialization=None,
    output_format="sse",
)

Processes Anthropic/Claude streaming responses into OpenAI format.

Parameters:

Name Type Description Default
message_id str | None

Response ID, generated if not provided

None
model str

Model name for responses

'claude-3-5-sonnet-20241022'
created int | None

Creation timestamp, current time if not provided

None
enable_usage bool

Whether to include usage information

True
enable_tool_calls bool

Whether to process tool calls

True
output_format Literal['sse', 'dict']

Output format - "sse" for Server-Sent Events strings, "dict" for dict objects

'sse'
Source code in ccproxy/llms/streaming/processors.py
def __init__(
    self,
    message_id: str | None = None,
    model: str = "claude-3-5-sonnet-20241022",
    created: int | None = None,
    enable_usage: bool = True,
    enable_tool_calls: bool = True,
    enable_thinking_serialization: bool | None = None,
    output_format: Literal["sse", "dict"] = "sse",
):
    """Initialize the stream processor.

    Args:
        message_id: Response ID, generated if not provided
        model: Model name for responses
        created: Creation timestamp, current time if not provided
        enable_usage: Whether to include usage information
        enable_tool_calls: Whether to process tool calls
        output_format: Output format - "sse" for Server-Sent Events strings, "dict" for dict objects
    """
    # Import here to avoid circular imports
    from ccproxy.llms.models.openai import generate_responses_id

    self.message_id = message_id or generate_responses_id()
    self.model = model
    self.created = created or int(time.time())
    self.enable_usage = enable_usage
    self.enable_tool_calls = enable_tool_calls
    self.output_format = output_format
    if enable_thinking_serialization is None:
        # Prefer service Settings.llm.openai_thinking_xml if available
        setting_val: bool | None = None
        try:
            from ccproxy.config.settings import Settings

            cfg = Settings.from_config()
            setting_val = bool(
                getattr(getattr(cfg, "llm", {}), "openai_thinking_xml", True)
            )
        except Exception:
            setting_val = None

        if setting_val is not None:
            self.enable_thinking_serialization = setting_val
        else:
            # Fallback to env-based toggle
            env_val = (
                os.getenv("LLM__OPENAI_THINKING_XML")
                or os.getenv("OPENAI_STREAM_ENABLE_THINKING_SERIALIZATION")
                or "true"
            ).lower()
            self.enable_thinking_serialization = env_val not in (
                "0",
                "false",
                "no",
                "off",
            )
    else:
        self.enable_thinking_serialization = enable_thinking_serialization
    self.formatter = OpenAISSEFormatter()

    # State tracking
    self.role_sent = False
    self.accumulated_content = ""
    self.tool_calls: dict[str, dict[str, Any]] = {}
    self.usage_info: dict[str, int] | None = None
    # Thinking block tracking
    self.current_thinking_text = ""
    self.current_thinking_signature: str | None = None
    self.thinking_block_active = False

process_stream async

process_stream(claude_stream)

Process a Claude/Anthropic stream into OpenAI format.

Parameters:

Name Type Description Default
claude_stream AsyncIterator[dict[str, Any]]

Async iterator of Claude response chunks

required

Yields:

Type Description
AsyncIterator[str | dict[str, Any]]

OpenAI-formatted SSE strings or dict objects based on output_format

Source code in ccproxy/llms/streaming/processors.py
async def process_stream(
    self, claude_stream: AsyncIterator[dict[str, Any]]
) -> AsyncIterator[str | dict[str, Any]]:
    """Process a Claude/Anthropic stream into OpenAI format.

    Args:
        claude_stream: Async iterator of Claude response chunks

    Yields:
        OpenAI-formatted SSE strings or dict objects based on output_format
    """
    # Get logger with request context at the start of the function
    logger = get_logger(__name__)

    try:
        chunk_count = 0
        processed_count = 0
        logger.debug(
            "openai_stream_processor_start",
            message_id=self.message_id,
            model=self.model,
            output_format=self.output_format,
            enable_usage=self.enable_usage,
            enable_tool_calls=self.enable_tool_calls,
            category="streaming_conversion",
            enable_thinking_serialization=self.enable_thinking_serialization,
        )

        async for chunk in claude_stream:
            chunk_count += 1
            chunk_type = chunk.get("type", "unknown")

            logger.trace(
                "openai_processor_input_chunk",
                chunk_number=chunk_count,
                chunk_type=chunk_type,
                category="format_detection",
            )

            async for sse_chunk in self._process_chunk(chunk):
                processed_count += 1
                yield sse_chunk

        logger.debug(
            "openai_stream_complete",
            total_chunks=chunk_count,
            processed_chunks=processed_count,
            message_id=self.message_id,
            category="streaming_conversion",
        )

        # Send final chunk
        if self.usage_info and self.enable_usage:
            yield self._format_chunk_output(
                finish_reason="stop",
                usage=self.usage_info,
            )
        else:
            yield self._format_chunk_output(finish_reason="stop")

        # Send DONE event (only for SSE format)
        if self.output_format == "sse":
            yield self.formatter.format_done()

    except (OSError, PermissionError) as e:
        logger.error("stream_processing_io_error", error=str(e), exc_info=e)
        # Send error chunk for IO errors
        if self.output_format == "sse":
            yield self.formatter.format_error_chunk(
                self.message_id,
                self.model,
                self.created,
                "error",
                f"IO error: {str(e)}",
            )
            yield self.formatter.format_done()
        else:
            # Dict format error
            yield self._create_chunk_dict(finish_reason="error")
    except Exception as e:
        logger.error("stream_processing_error", error=str(e), exc_info=e)
        # Send error chunk
        if self.output_format == "sse":
            yield self.formatter.format_error_chunk(
                self.message_id, self.model, self.created, "error", str(e)
            )
            yield self.formatter.format_done()
        else:
            # Dict format error
            yield self._create_chunk_dict(finish_reason="error")