Skip to content

ccproxy.llms.formatters.openai_to_anthropic

ccproxy.llms.formatters.openai_to_anthropic

Facade module exposing OpenAI→Anthropic formatter entry points.

OpenAIChatToAnthropicStreamAdapter

Stateful adapter for OpenAI Chat → Anthropic streaming.

OpenAIResponsesToAnthropicStreamAdapter

Stateful adapter for OpenAI Responses → Anthropic streaming.

convert__openai_to_anthropic__error

convert__openai_to_anthropic__error(error)

Convert an OpenAI error payload to the Anthropic envelope.

Source code in ccproxy/llms/formatters/openai_to_anthropic/errors.py
def convert__openai_to_anthropic__error(error: BaseModel) -> BaseModel:
    """Convert an OpenAI error payload to the Anthropic envelope."""
    if isinstance(error, openai_models.ErrorResponse):
        openai_error = error.error
        error_message = openai_error.message
        openai_error_type = openai_error.type or "api_error"
        anthropic_error_type = OPENAI_TO_ANTHROPIC_ERROR_TYPE.get(
            openai_error_type, "api_error"
        )

        if anthropic_error_type == "invalid_request_error":
            anthropic_error: anthropic_models.ErrorType = (
                anthropic_models.InvalidRequestError(message=error_message)
            )
        elif anthropic_error_type == "rate_limit_error":
            anthropic_error = anthropic_models.RateLimitError(message=error_message)
        else:
            anthropic_error = anthropic_models.APIError(message=error_message)

        return anthropic_models.ErrorResponse(error=anthropic_error)

    if hasattr(error, "error") and hasattr(error.error, "message"):
        error_message = error.error.message
        fallback_error: anthropic_models.ErrorType = anthropic_models.APIError(
            message=error_message
        )
        return anthropic_models.ErrorResponse(error=fallback_error)

    error_message = "Unknown error occurred"
    if hasattr(error, "message"):
        error_message = error.message
    elif hasattr(error, "model_dump"):
        error_dict = error.model_dump()
        error_message = str(error_dict.get("message", error_dict))

    generic_error: anthropic_models.ErrorType = anthropic_models.APIError(
        message=error_message
    )
    return anthropic_models.ErrorResponse(error=generic_error)

convert__openai_chat_to_anthropic_message__request async

convert__openai_chat_to_anthropic_message__request(request)

Convert OpenAI ChatCompletionRequest to Anthropic CreateMessageRequest using typed models.

Source code in ccproxy/llms/formatters/openai_to_anthropic/requests.py
async def convert__openai_chat_to_anthropic_message__request(
    request: openai_models.ChatCompletionRequest,
) -> anthropic_models.CreateMessageRequest:
    """Convert OpenAI ChatCompletionRequest to Anthropic CreateMessageRequest using typed models."""
    model = request.model.strip() if request.model else ""

    # Determine max tokens
    max_tokens = request.max_completion_tokens
    if max_tokens is None:
        # Access deprecated field with warning suppressed for backward compatibility
        import warnings

        with warnings.catch_warnings():
            warnings.simplefilter("ignore", DeprecationWarning)
            max_tokens = request.max_tokens
    if max_tokens is None:
        max_tokens = DEFAULT_MAX_TOKENS

    # Extract system message if present
    system_value: str | None = None
    out_messages: list[dict[str, Any]] = []

    for msg in request.messages or []:
        role = msg.role
        content = msg.content
        tool_calls = getattr(msg, "tool_calls", None)

        if role == "system":
            if isinstance(content, str):
                system_value = content
            elif isinstance(content, list):
                texts = [
                    part.text
                    for part in content
                    if hasattr(part, "type")
                    and part.type == "text"
                    and hasattr(part, "text")
                ]
                system_value = " ".join([t for t in texts if t]) or None
        elif role == "assistant":
            if tool_calls:
                blocks = []
                if content:  # Add text content if present
                    blocks.append({"type": "text", "text": str(content)})
                for tc in tool_calls:
                    func_info = tc.function
                    tool_name = func_info.name if func_info else None
                    tool_args = func_info.arguments if func_info else "{}"
                    blocks.append(
                        {
                            "type": "tool_use",
                            "id": tc.id,
                            "name": str(tool_name) if tool_name is not None else "",
                            "input": json.loads(str(tool_args)),
                        }
                    )
                out_messages.append({"role": "assistant", "content": blocks})
            elif content is not None:
                out_messages.append({"role": "assistant", "content": content})

        elif role == "tool":
            tool_call_id = getattr(msg, "tool_call_id", None)
            out_messages.append(
                {
                    "role": "user",  # Anthropic uses 'user' role for tool results
                    "content": [
                        {
                            "type": "tool_result",
                            "tool_use_id": tool_call_id,
                            "content": str(content),
                        }
                    ],
                }
            )
        elif role == "user":
            if content is None:
                continue
            if isinstance(content, list):
                user_blocks: list[dict[str, Any]] = []
                text_accum: list[str] = []
                for part in content:
                    # Handle both dict and Pydantic object inputs
                    if isinstance(part, dict):
                        ptype = part.get("type")
                        if ptype == "text":
                            t = part.get("text")
                            if isinstance(t, str):
                                text_accum.append(t)
                        elif ptype == "image_url":
                            image_info = part.get("image_url")
                            if isinstance(image_info, dict):
                                url = image_info.get("url")
                                if isinstance(url, str) and url.startswith("data:"):
                                    try:
                                        header, b64data = url.split(",", 1)
                                        mediatype = header.split(";")[0].split(":", 1)[
                                            1
                                        ]
                                        user_blocks.append(
                                            {
                                                "type": "image",
                                                "source": {
                                                    "type": "base64",
                                                    "media_type": str(mediatype),
                                                    "data": str(b64data),
                                                },
                                            }
                                        )
                                    except Exception:
                                        pass
                    elif hasattr(part, "type"):
                        # Pydantic object case
                        ptype = part.type
                        if ptype == "text" and hasattr(part, "text"):
                            t = part.text
                            if isinstance(t, str):
                                text_accum.append(t)
                        elif ptype == "image_url" and hasattr(part, "image_url"):
                            url = part.image_url.url if part.image_url else None
                            if isinstance(url, str) and url.startswith("data:"):
                                try:
                                    header, b64data = url.split(",", 1)
                                    mediatype = header.split(";")[0].split(":", 1)[1]
                                    user_blocks.append(
                                        {
                                            "type": "image",
                                            "source": {
                                                "type": "base64",
                                                "media_type": str(mediatype),
                                                "data": str(b64data),
                                            },
                                        }
                                    )
                                except Exception:
                                    pass
                if user_blocks:
                    # If we have images, always use list format
                    if text_accum:
                        user_blocks.insert(
                            0, {"type": "text", "text": " ".join(text_accum)}
                        )
                    out_messages.append({"role": "user", "content": user_blocks})
                elif len(text_accum) > 1:
                    # Multiple text parts - use list format
                    text_blocks = [{"type": "text", "text": " ".join(text_accum)}]
                    out_messages.append({"role": "user", "content": text_blocks})
                elif len(text_accum) == 1:
                    # Single text part - use string format
                    out_messages.append({"role": "user", "content": text_accum[0]})
                else:
                    # No content - use empty string
                    out_messages.append({"role": "user", "content": ""})
            else:
                out_messages.append({"role": "user", "content": content})

    # Sanitize tool_result blocks to ensure they have matching tool_use blocks
    out_messages = _sanitize_tool_results(out_messages)

    payload_data: dict[str, Any] = {
        "model": model,
        "messages": out_messages,
        "max_tokens": max_tokens,
    }

    # Inject system guidance for response_format JSON modes
    resp_fmt = request.response_format
    if resp_fmt is not None:
        inject: str | None = None
        if resp_fmt.type == "json_object":
            inject = (
                "Respond ONLY with a valid JSON object. "
                "Do not include any additional text, markdown, or explanation."
            )
        elif resp_fmt.type == "json_schema" and hasattr(resp_fmt, "json_schema"):
            schema = resp_fmt.json_schema
            try:
                if schema is not None:
                    schema_str = json.dumps(
                        schema.model_dump()
                        if hasattr(schema, "model_dump")
                        else schema,
                        ensure_ascii=False,
                        separators=(",", ":"),
                    )
                else:
                    schema_str = "{}"
            except Exception:
                schema_str = str(schema or {})
            inject = (
                "Respond ONLY with a JSON object that strictly conforms to this JSON Schema:\n"
                f"{schema_str}"
            )
        if inject:
            if system_value:
                system_value = f"{system_value}\n\n{inject}"
            else:
                system_value = inject

    if system_value is not None:
        # Ensure system value is a string, not a complex object
        if isinstance(system_value, str):
            payload_data["system"] = system_value
        else:
            # If system_value is not a string, try to extract text content
            try:
                if isinstance(system_value, list):
                    # Handle list format: [{"type": "text", "text": "...", "cache_control": {...}}]
                    text_parts = []
                    for part in system_value:
                        if isinstance(part, dict) and part.get("type") == "text":
                            text_content = part.get("text")
                            if isinstance(text_content, str):
                                text_parts.append(text_content)
                    if text_parts:
                        payload_data["system"] = " ".join(text_parts)
                elif (
                    isinstance(system_value, dict)
                    and system_value.get("type") == "text"
                ):
                    # Handle single dict format: {"type": "text", "text": "...", "cache_control": {...}}
                    text_content = system_value.get("text")
                    if isinstance(text_content, str):
                        payload_data["system"] = text_content
            except Exception:
                # Fallback: convert to string representation
                payload_data["system"] = str(system_value)
    if request.stream is not None:
        payload_data["stream"] = request.stream

    # Tools mapping (OpenAI function tools -> Anthropic tool definitions)
    tools_in = request.tools or []
    if tools_in:
        anth_tools: list[dict[str, Any]] = []
        for t in tools_in:
            if t.type == "function" and t.function is not None:
                fn = t.function
                anth_tools.append(
                    {
                        "type": "custom",
                        "name": fn.name,
                        "description": fn.description,
                        "input_schema": fn.parameters.model_dump()
                        if hasattr(fn.parameters, "model_dump")
                        else (fn.parameters or {}),
                    }
                )
        if anth_tools:
            payload_data["tools"] = anth_tools

    # tool_choice mapping
    tool_choice = request.tool_choice
    parallel_tool_calls = request.parallel_tool_calls
    disable_parallel = None
    if isinstance(parallel_tool_calls, bool):
        disable_parallel = not parallel_tool_calls

    if tool_choice is not None:
        anth_choice: dict[str, Any] | None = None
        if isinstance(tool_choice, str):
            if tool_choice == "none":
                anth_choice = {"type": "none"}
            elif tool_choice == "auto":
                anth_choice = {"type": "auto"}
            elif tool_choice == "required":
                anth_choice = {"type": "any"}
        elif isinstance(tool_choice, dict):
            # Handle dict input like {"type": "function", "function": {"name": "search"}}
            if tool_choice.get("type") == "function" and isinstance(
                tool_choice.get("function"), dict
            ):
                anth_choice = {
                    "type": "tool",
                    "name": tool_choice["function"].get("name"),
                }
        elif hasattr(tool_choice, "type") and hasattr(tool_choice, "function"):
            # e.g., ChatCompletionNamedToolChoice pydantic model
            if tool_choice.type == "function" and tool_choice.function is not None:
                anth_choice = {
                    "type": "tool",
                    "name": tool_choice.function.name,
                }
        if anth_choice is not None:
            if disable_parallel is not None and anth_choice["type"] in {
                "auto",
                "any",
                "tool",
            }:
                anth_choice["disable_parallel_tool_use"] = disable_parallel
            payload_data["tool_choice"] = anth_choice

    # Thinking configuration
    thinking_cfg = derive_thinking_config(model, request)
    if thinking_cfg is not None:
        payload_data["thinking"] = thinking_cfg
        # Ensure token budget fits under max_tokens
        budget = thinking_cfg.get("budget_tokens", 0)
        if isinstance(budget, int) and max_tokens <= budget:
            payload_data["max_tokens"] = budget + 64
        # Temperature constraint when thinking enabled
        payload_data["temperature"] = 1.0

    # Validate against Anthropic model to ensure shape
    return anthropic_models.CreateMessageRequest.model_validate(payload_data)

convert__openai_chat_to_anthropic_messages__response

convert__openai_chat_to_anthropic_messages__response(
    response,
)

Convert OpenAI ChatCompletionResponse to Anthropic MessageResponse.

Source code in ccproxy/llms/formatters/openai_to_anthropic/responses.py
def convert__openai_chat_to_anthropic_messages__response(
    response: openai_models.ChatCompletionResponse,
) -> anthropic_models.MessageResponse:
    """Convert OpenAI ChatCompletionResponse to Anthropic MessageResponse."""
    text_content = ""
    finish_reason = None
    tool_contents: list[anthropic_models.ToolUseBlock] = []
    if response.choices:
        choice = response.choices[0]
        finish_reason = getattr(choice, "finish_reason", None)
        msg = getattr(choice, "message", None)
        if msg is not None:
            content_val = getattr(msg, "content", None)
            if isinstance(content_val, str):
                text_content = content_val
            elif isinstance(content_val, list):
                parts: list[str] = []
                for part in content_val:
                    if isinstance(part, dict) and part.get("type") == "text":
                        t = part.get("text")
                        if isinstance(t, str):
                            parts.append(t)
                text_content = "".join(parts)

            # Extract OpenAI Chat tool calls (strict JSON parsing)
            tool_calls = getattr(msg, "tool_calls", None)
            if isinstance(tool_calls, list):
                for i, tc in enumerate(tool_calls):
                    fn = getattr(tc, "function", None)
                    if fn is None and isinstance(tc, dict):
                        fn = tc.get("function")
                    if not fn:
                        continue
                    name = getattr(fn, "name", None)
                    if name is None and isinstance(fn, dict):
                        name = fn.get("name")
                    args_raw = getattr(fn, "arguments", None)
                    if args_raw is None and isinstance(fn, dict):
                        args_raw = fn.get("arguments")
                    args = strict_parse_tool_arguments(args_raw)
                    tool_id = getattr(tc, "id", None)
                    if tool_id is None and isinstance(tc, dict):
                        tool_id = tc.get("id")
                    tool_contents.append(
                        anthropic_models.ToolUseBlock(
                            type="tool_use",
                            id=tool_id or f"call_{i}",
                            name=name or "function",
                            input=args,
                        )
                    )
            # Legacy single function
            legacy_fn = getattr(msg, "function", None)
            if legacy_fn:
                name = getattr(legacy_fn, "name", None)
                args_raw = getattr(legacy_fn, "arguments", None)
                args = strict_parse_tool_arguments(args_raw)
                tool_contents.append(
                    anthropic_models.ToolUseBlock(
                        type="tool_use",
                        id="call_0",
                        name=name or "function",
                        input=args,
                    )
                )

    content_blocks: list[anthropic_models.ResponseContentBlock] = []
    if text_content:
        content_blocks.append(
            anthropic_models.TextBlock(type="text", text=text_content)
        )
    # Append tool blocks after text (order matches Responses path patterns)
    content_blocks.extend(tool_contents)

    # Map usage via shared utility
    usage = openai_usage_to_anthropic_usage(getattr(response, "usage", None))

    stop_reason = map_openai_finish_to_anthropic_stop(finish_reason)

    return anthropic_models.MessageResponse(
        id=getattr(response, "id", "msg_1") or "msg_1",
        type="message",
        role="assistant",
        model=getattr(response, "model", "") or "",
        content=content_blocks,
        stop_reason=stop_reason,
        stop_sequence=None,
        usage=usage,
    )

convert__openai_chat_to_anthropic_messages__stream

convert__openai_chat_to_anthropic_messages__stream(stream)

Translate OpenAI ChatCompletion streams into Anthropic message events.

Source code in ccproxy/llms/formatters/openai_to_anthropic/streams.py
def convert__openai_chat_to_anthropic_messages__stream(
    stream: AsyncIterator[Any],
) -> AsyncGenerator[anthropic_models.MessageStreamEvent, None]:
    """Translate OpenAI ChatCompletion streams into Anthropic message events."""

    adapter = OpenAIChatToAnthropicStreamAdapter()
    return adapter.run(stream)

convert__openai_responses_to_anthropic_messages__stream async

convert__openai_responses_to_anthropic_messages__stream(
    stream,
)

Translate OpenAI Responses streaming events into Anthropic message events.

Source code in ccproxy/llms/formatters/openai_to_anthropic/streams.py
async def convert__openai_responses_to_anthropic_messages__stream(
    stream: AsyncIterator[Any],
) -> AsyncGenerator[anthropic_models.MessageStreamEvent, None]:
    """Translate OpenAI Responses streaming events into Anthropic message events."""

    adapter = OpenAIResponsesToAnthropicStreamAdapter()
    async for event in adapter.run(stream):
        yield event