Skip to content

ccproxy.streaming.sse_parser

ccproxy.streaming.sse_parser

Helpers for incrementally parsing server-sent events (SSE).

SSEStreamParser

SSEStreamParser()

Accumulate SSE fragments and yield decoded data: payloads.

The parser keeps track of partial lines and events across feed calls so callers can push raw provider chunks (str or bytes) and only receive payloads when a full SSE event has been received. data: [DONE] sentinel events are filtered out automatically.

Source code in ccproxy/streaming/sse_parser.py
def __init__(self) -> None:
    self._line_remainder: str = ""
    self._event_lines: list[str] = []
    self._errors: list[tuple[str, Exception]] = []

feed

feed(chunk)

Process a streaming fragment and return decoded JSON payloads.

Parameters:

Name Type Description Default
chunk str | bytes | None

Raw chunk from the provider. bytes inputs are decoded using UTF-8. None or empty values yield no events.

required

Returns:

Type Description
list[Any]

List of decoded JSON payloads for completed events. [DONE]

list[Any]

sentinels are omitted.

Source code in ccproxy/streaming/sse_parser.py
def feed(self, chunk: str | bytes | None) -> list[Any]:
    """Process a streaming fragment and return decoded JSON payloads.

    Args:
        chunk: Raw chunk from the provider. ``bytes`` inputs are decoded
            using UTF-8. ``None`` or empty values yield no events.

    Returns:
        List of decoded JSON payloads for completed events. ``[DONE]``
        sentinels are omitted.
    """

    if not chunk:
        return []

    if isinstance(chunk, bytes):
        chunk = chunk.decode("utf-8", errors="ignore")

    if not chunk:
        return []

    chunk = chunk.replace("\r\n", "\n").replace("\r", "\n")
    buffered = f"{self._line_remainder}{chunk}"

    lines = buffered.split("\n")
    if buffered.endswith("\n"):
        self._line_remainder = ""
    else:
        self._line_remainder = lines.pop()

    completed: list[Any] = []

    for line in lines:
        if line == "":
            payload = self._finalize_event()
            if payload:
                completed.append(payload)
            continue

        self._event_lines.append(line)

    return completed

flush

flush()

Return any buffered payload when the stream ends.

Source code in ccproxy/streaming/sse_parser.py
def flush(self) -> list[Any]:
    """Return any buffered payload when the stream ends."""

    if self._line_remainder:
        self._event_lines.append(self._line_remainder)
        self._line_remainder = ""

    payload = self._finalize_event()
    return [payload] if payload else []

consume_errors

consume_errors()

Return and clear parsing errors captured since the last call.

Source code in ccproxy/streaming/sse_parser.py
def consume_errors(self) -> list[tuple[str, Exception]]:
    """Return and clear parsing errors captured since the last call."""

    errors = self._errors
    self._errors = []
    return errors