Skip to content

ccproxy.claude_sdk

ccproxy.claude_sdk

Claude SDK integration module.

ClaudeSDKClient

ClaudeSDKClient(settings=None, session_manager=None)

Minimal Claude SDK client wrapper that handles core SDK interactions.

This class provides a clean interface to the Claude Code SDK while handling error translation and basic query execution. Supports both stateless query() calls and pooled connection reuse for improved performance.

Parameters:

Name Type Description Default
settings Settings | None

Application settings for session pool configuration

None
session_manager SessionManager | None

Optional SessionManager instance for dependency injection

None
Source code in ccproxy/claude_sdk/client.py
def __init__(
    self,
    settings: Settings | None = None,
    session_manager: SessionManager | None = None,
) -> None:
    """Initialize the Claude SDK client.

    Args:
        settings: Application settings for session pool configuration
        session_manager: Optional SessionManager instance for dependency injection
    """
    self._last_api_call_time_ms: float = 0.0
    self._settings = settings
    self._session_manager = session_manager

query_completion async

query_completion(
    message, options, request_id=None, session_id=None
)

Execute a query using the Claude Code SDK and return a StreamHandle.

Parameters:

Name Type Description Default
message SDKMessage

SDKMessage to send to Claude SDK

required
options ClaudeCodeOptions

Claude Code options configuration

required
request_id str | None

Optional request ID for correlation

None
session_id str | None

Optional session ID for conversation continuity

None

Returns:

Type Description
StreamHandle

StreamHandle that can create listeners for the stream

Raises:

Type Description
ClaudeSDKError

If the query fails

Source code in ccproxy/claude_sdk/client.py
async def query_completion(
    self,
    message: SDKMessage,
    options: ClaudeCodeOptions,
    request_id: str | None = None,
    session_id: str | None = None,
) -> StreamHandle:
    """
    Execute a query using the Claude Code SDK and return a StreamHandle.

    Args:
        message: SDKMessage to send to Claude SDK
        options: Claude Code options configuration
        request_id: Optional request ID for correlation
        session_id: Optional session ID for conversation continuity

    Returns:
        StreamHandle that can create listeners for the stream

    Raises:
        ClaudeSDKError: If the query fails
    """
    # Determine routing strategy
    if self._should_use_session_pool(session_id):
        return await self._create_session_pool_stream_handle(
            message, options, request_id, session_id
        )
    else:
        return await self._create_direct_stream_handle(
            message, options, request_id, session_id
        )

validate_health async

validate_health()

Validate that the Claude SDK is healthy.

Returns:

Type Description
bool

True if healthy, False otherwise

Source code in ccproxy/claude_sdk/client.py
async def validate_health(self) -> bool:
    """
    Validate that the Claude SDK is healthy.

    Returns:
        True if healthy, False otherwise
    """
    try:
        logger.debug("health_check_start", component="claude_sdk")

        # Simple health check - the SDK is available if we can import it
        # More sophisticated checks could be added here
        is_healthy = True

        logger.debug(
            "health_check_completed", component="claude_sdk", healthy=is_healthy
        )
        return is_healthy
    except Exception as e:
        logger.error(
            "health_check_failed",
            component="claude_sdk",
            error=str(e),
            error_type=type(e).__name__,
        )
        return False

interrupt_session async

interrupt_session(session_id)

Interrupt a specific session due to client disconnection.

Parameters:

Name Type Description Default
session_id str

The session ID to interrupt

required

Returns:

Type Description
bool

True if session was found and interrupted, False otherwise

Source code in ccproxy/claude_sdk/client.py
async def interrupt_session(self, session_id: str) -> bool:
    """Interrupt a specific session due to client disconnection.

    Args:
        session_id: The session ID to interrupt

    Returns:
        True if session was found and interrupted, False otherwise
    """
    logger.debug("sdk_client_interrupt_session_started", session_id=session_id)
    if self._session_manager:
        logger.info(
            "client_interrupt_session_requested",
            session_id=session_id,
            has_session_manager=True,
        )
        return await self._session_manager.interrupt_session(session_id)
    else:
        logger.warning(
            "client_interrupt_session_no_session_manager",
            session_id=session_id,
        )
        return False

close async

close()

Close the client and cleanup resources.

Source code in ccproxy/claude_sdk/client.py
async def close(self) -> None:
    """Close the client and cleanup resources."""
    # Claude Code SDK doesn't require explicit cleanup
    pass

MessageConverter

Handles conversion between Anthropic API format and Claude SDK format.

format_messages_to_prompt staticmethod

format_messages_to_prompt(messages)

Convert Anthropic messages format to a single prompt string.

Parameters:

Name Type Description Default
messages list[dict[str, Any]]

List of messages in Anthropic format

required

Returns:

Type Description
str

Single prompt string formatted for Claude SDK

Source code in ccproxy/claude_sdk/converter.py
@staticmethod
def format_messages_to_prompt(messages: list[dict[str, Any]]) -> str:
    """
    Convert Anthropic messages format to a single prompt string.

    Args:
        messages: List of messages in Anthropic format

    Returns:
        Single prompt string formatted for Claude SDK
    """
    prompt_parts = []

    for message in messages:
        role = message.get("role", "")
        content = message.get("content", "")

        if isinstance(content, list):
            # Handle content blocks
            text_parts = []
            for block in content:
                if block.get("type") == "text":
                    text_parts.append(block.get("text", ""))
            content = " ".join(text_parts)

        if role == "user":
            prompt_parts.append(f"Human: {content}")
        elif role == "assistant":
            prompt_parts.append(f"Assistant: {content}")
        elif role == "system":
            # System messages are handled via options
            continue

    return "\n\n".join(prompt_parts)

convert_to_anthropic_response staticmethod

convert_to_anthropic_response(
    assistant_message,
    result_message,
    model,
    mode=FORWARD,
    pretty_format=True,
)

Convert Claude SDK messages to Anthropic API response format.

Parameters:

Name Type Description Default
assistant_message AssistantMessage

The assistant message from Claude SDK

required
result_message ResultMessage

The result message from Claude SDK

required
model str

The model name used

required
mode SDKMessageMode

System message handling mode (forward, ignore, formatted)

FORWARD
pretty_format bool

Whether to use pretty formatting (true: indented JSON with newlines, false: compact with escaped content)

True

Returns:

Type Description
MessageResponse

Response in Anthropic API format

Source code in ccproxy/claude_sdk/converter.py
@staticmethod
def convert_to_anthropic_response(
    assistant_message: sdk_models.AssistantMessage,
    result_message: sdk_models.ResultMessage,
    model: str,
    mode: SDKMessageMode = SDKMessageMode.FORWARD,
    pretty_format: bool = True,
) -> "MessageResponse":
    """
    Convert Claude SDK messages to Anthropic API response format.

    Args:
        assistant_message: The assistant message from Claude SDK
        result_message: The result message from Claude SDK
        model: The model name used
        mode: System message handling mode (forward, ignore, formatted)
        pretty_format: Whether to use pretty formatting (true: indented JSON with newlines, false: compact with escaped content)

    Returns:
        Response in Anthropic API format
    """
    # Extract token usage from result message
    usage = result_message.usage_model

    # Log token extraction for debugging
    # logger.debug(
    #     "assistant_message_content",
    #     content_blocks=[block.type for block in assistant_message.content],
    #     content_count=len(assistant_message.content),
    # )

    logger.debug(
        "token_usage_extracted",
        input_tokens=usage.input_tokens,
        output_tokens=usage.output_tokens,
        cache_read_tokens=usage.cache_read_input_tokens,
        cache_write_tokens=usage.cache_creation_input_tokens,
        source="claude_sdk",
    )

    # Build usage information
    usage_info = usage.model_dump(mode="json")

    # Add cost information if available
    if result_message.total_cost_usd is not None:
        usage_info["cost_usd"] = result_message.total_cost_usd

    # Convert content blocks to Anthropic format, preserving thinking blocks
    content_blocks = []

    for block in assistant_message.content:
        if isinstance(block, sdk_models.TextBlock):
            # Handle text content directly without thinking block parsing
            text = block.text
            if mode == SDKMessageMode.FORMATTED:
                escaped_text = MessageConverter._escape_content_for_xml(
                    text, pretty_format
                )
                formatted_text = (
                    f"<text>\n{escaped_text}\n</text>\n"
                    if pretty_format
                    else f"<text>{escaped_text}</text>"
                )
                content_blocks.append({"type": "text", "text": formatted_text})
            else:
                content_blocks.append({"type": "text", "text": text})

        elif isinstance(block, sdk_models.ToolUseBlock):
            if mode == SDKMessageMode.FORWARD:
                content_blocks.append(block.to_sdk_block())
            elif mode == SDKMessageMode.FORMATTED:
                tool_data = block.model_dump(mode="json")
                formatted_json = MessageConverter._format_json_data(
                    tool_data, pretty_format
                )
                escaped_json = MessageConverter._escape_content_for_xml(
                    formatted_json, pretty_format
                )
                formatted_text = (
                    f"<tool_use_sdk>\n{escaped_json}\n</tool_use_sdk>\n"
                    if pretty_format
                    else f"<tool_use_sdk>{escaped_json}</tool_use_sdk>"
                )
                content_blocks.append({"type": "text", "text": formatted_text})

        elif isinstance(block, sdk_models.ToolResultBlock):
            if mode == SDKMessageMode.FORWARD:
                content_blocks.append(block.to_sdk_block())
            elif mode == SDKMessageMode.FORMATTED:
                tool_result_data = block.model_dump(mode="json")
                formatted_json = MessageConverter._format_json_data(
                    tool_result_data, pretty_format
                )
                escaped_json = MessageConverter._escape_content_for_xml(
                    formatted_json, pretty_format
                )
                formatted_text = (
                    f"<tool_result_sdk>\n{escaped_json}\n</tool_result_sdk>\n"
                    if pretty_format
                    else f"<tool_result_sdk>{escaped_json}</tool_result_sdk>"
                )
                content_blocks.append({"type": "text", "text": formatted_text})

        elif isinstance(block, sdk_models.ThinkingBlock):
            if mode == SDKMessageMode.FORWARD:
                thinking_block = {
                    "type": "thinking",
                    "thinking": block.thinking,
                }
                if block.signature is not None:
                    thinking_block["signature"] = block.signature
                content_blocks.append(thinking_block)
            elif mode == SDKMessageMode.FORMATTED:
                # Format thinking block with signature in XML tag attribute
                signature_attr = (
                    f' signature="{block.signature}"' if block.signature else ""
                )
                if pretty_format:
                    escaped_text = MessageConverter._escape_content_for_xml(
                        block.thinking, pretty_format
                    )
                    formatted_text = (
                        f"<thinking{signature_attr}>\n{escaped_text}\n</thinking>\n"
                    )
                else:
                    escaped_text = MessageConverter._escape_content_for_xml(
                        block.thinking, pretty_format
                    )
                    formatted_text = (
                        f"<thinking{signature_attr}>{escaped_text}</thinking>"
                    )
                content_blocks.append({"type": "text", "text": formatted_text})

    return MessageResponse.model_validate(
        {
            "id": f"msg_{result_message.session_id}",
            "type": "message",
            "role": "assistant",
            "content": content_blocks,
            "model": model,
            "stop_reason": result_message.stop_reason,
            "stop_sequence": None,
            "usage": usage_info,
        }
    )

create_streaming_start_chunks staticmethod

create_streaming_start_chunks(
    message_id, model, input_tokens=0
)

Create the initial streaming chunks for Anthropic API format.

Parameters:

Name Type Description Default
message_id str

The message ID

required
model str

The model name

required
input_tokens int

Number of input tokens for the request

0

Returns:

Type Description
list[tuple[str, dict[str, Any]]]

List of tuples (event_type, chunk) for initial streaming chunks

Source code in ccproxy/claude_sdk/converter.py
@staticmethod
def create_streaming_start_chunks(
    message_id: str, model: str, input_tokens: int = 0
) -> list[tuple[str, dict[str, Any]]]:
    """
    Create the initial streaming chunks for Anthropic API format.

    Args:
        message_id: The message ID
        model: The model name
        input_tokens: Number of input tokens for the request

    Returns:
        List of tuples (event_type, chunk) for initial streaming chunks
    """
    return [
        # First, send message_start with event type
        (
            "message_start",
            {
                "type": "message_start",
                "message": {
                    "id": message_id,
                    "type": "message",
                    "role": "assistant",
                    "model": model,
                    "content": [],
                    "stop_reason": None,
                    "stop_sequence": None,
                    "usage": {
                        "input_tokens": input_tokens,
                        "cache_creation_input_tokens": 0,
                        "cache_read_input_tokens": 0,
                        "output_tokens": 1,
                        "service_tier": "standard",
                    },
                },
            },
        ),
    ]

create_streaming_delta_chunk staticmethod

create_streaming_delta_chunk(text)

Create a streaming delta chunk for Anthropic API format.

Parameters:

Name Type Description Default
text str

The text content to include

required

Returns:

Type Description
tuple[str, dict[str, Any]]

Tuple of (event_type, chunk)

Source code in ccproxy/claude_sdk/converter.py
@staticmethod
def create_streaming_delta_chunk(text: str) -> tuple[str, dict[str, Any]]:
    """
    Create a streaming delta chunk for Anthropic API format.

    Args:
        text: The text content to include

    Returns:
        Tuple of (event_type, chunk)
    """
    return (
        "content_block_delta",
        {
            "type": "content_block_delta",
            "index": 0,
            "delta": {"type": "text_delta", "text": text},
        },
    )

create_streaming_end_chunks staticmethod

create_streaming_end_chunks(
    stop_reason="end_turn", stop_sequence=None
)

Create the final streaming chunks for Anthropic API format.

Parameters:

Name Type Description Default
stop_reason str

The reason for stopping

'end_turn'
stop_sequence str | None

The stop sequence used (if any)

None

Returns:

Type Description
list[tuple[str, dict[str, Any]]]

List of tuples (event_type, chunk) for final streaming chunks

Source code in ccproxy/claude_sdk/converter.py
@staticmethod
def create_streaming_end_chunks(
    stop_reason: str = "end_turn", stop_sequence: str | None = None
) -> list[tuple[str, dict[str, Any]]]:
    """
    Create the final streaming chunks for Anthropic API format.

    Args:
        stop_reason: The reason for stopping
        stop_sequence: The stop sequence used (if any)

    Returns:
        List of tuples (event_type, chunk) for final streaming chunks
    """
    return [
        # Then, send message_delta with stop reason and usage
        (
            "message_delta",
            {
                "type": "message_delta",
                "delta": {
                    "stop_reason": stop_reason,
                    "stop_sequence": stop_sequence,
                },
                "usage": {"output_tokens": 0},
            },
        ),
        # Finally, send message_stop
        ("message_stop", {"type": "message_stop"}),
    ]

create_ping_chunk staticmethod

create_ping_chunk()

Create a ping chunk for keeping the connection alive.

Returns:

Type Description
tuple[str, dict[str, Any]]

Tuple of (event_type, chunk)

Source code in ccproxy/claude_sdk/converter.py
@staticmethod
def create_ping_chunk() -> tuple[str, dict[str, Any]]:
    """
    Create a ping chunk for keeping the connection alive.

    Returns:
        Tuple of (event_type, chunk)
    """
    return ("ping", {"type": "ping"})

ClaudeSDKError

Bases: Exception

Base Claude SDK error.

StreamTimeoutError

StreamTimeoutError(message, session_id, timeout_seconds)

Bases: ClaudeSDKError

Stream timeout error when no SDK message is received within timeout.

Source code in ccproxy/claude_sdk/exceptions.py
def __init__(self, message: str, session_id: str, timeout_seconds: float):
    super().__init__(message)
    self.session_id = session_id
    self.timeout_seconds = timeout_seconds

OptionsHandler

OptionsHandler(settings=None)

Handles creation and management of Claude SDK options.

Parameters:

Name Type Description Default
settings Settings | None

Application settings containing default Claude options

None
Source code in ccproxy/claude_sdk/options.py
def __init__(self, settings: Settings | None = None) -> None:
    """
    Initialize options handler.

    Args:
        settings: Application settings containing default Claude options
    """
    self.settings = settings

create_options

create_options(
    model,
    temperature=None,
    max_tokens=None,
    system_message=None,
    **additional_options,
)

Create Claude SDK options from API parameters.

Parameters:

Name Type Description Default
model str

The model name

required
temperature float | None

Temperature for response generation

None
max_tokens int | None

Maximum tokens in response

None
system_message str | None

System message to include

None
**additional_options Any

Additional options to set on the ClaudeCodeOptions instance

{}

Returns:

Type Description
ClaudeCodeOptions

Configured ClaudeCodeOptions instance

Source code in ccproxy/claude_sdk/options.py
def create_options(
    self,
    model: str,
    temperature: float | None = None,
    max_tokens: int | None = None,
    system_message: str | None = None,
    **additional_options: Any,
) -> ClaudeCodeOptions:
    """
    Create Claude SDK options from API parameters.

    Args:
        model: The model name
        temperature: Temperature for response generation
        max_tokens: Maximum tokens in response
        system_message: System message to include
        **additional_options: Additional options to set on the ClaudeCodeOptions instance

    Returns:
        Configured ClaudeCodeOptions instance
    """
    # Start with configured defaults if available, otherwise create fresh instance
    if self.settings and self.settings.claude.code_options:
        # Use the configured options as base - this preserves all default settings
        # including complex objects like mcp_servers and permission_prompt_tool_name
        configured_opts = self.settings.claude.code_options

        # Create a new instance with the same configuration
        # We need to extract the configuration values properly with type safety

        # Extract configuration values with proper types
        mcp_servers = (
            configured_opts.mcp_servers.copy()
            if isinstance(configured_opts.mcp_servers, dict)
            else {}
        )
        permission_prompt_tool_name = configured_opts.permission_prompt_tool_name
        max_thinking_tokens = getattr(configured_opts, "max_thinking_tokens", None)
        allowed_tools = getattr(configured_opts, "allowed_tools", None)
        disallowed_tools = getattr(configured_opts, "disallowed_tools", None)
        cwd = getattr(configured_opts, "cwd", None)
        append_system_prompt = getattr(
            configured_opts, "append_system_prompt", None
        )
        max_turns = getattr(configured_opts, "max_turns", None)
        continue_conversation = getattr(
            configured_opts, "continue_conversation", None
        )
        permission_mode = getattr(configured_opts, "permission_mode", None)

        # Build ClaudeCodeOptions with proper type handling
        # Start with a basic instance and set attributes individually for type safety
        options = ClaudeCodeOptions(
            mcp_servers=mcp_servers,
            permission_prompt_tool_name=permission_prompt_tool_name,
        )

        # Set additional attributes if they exist and are not None
        if max_thinking_tokens is not None:
            options.max_thinking_tokens = int(max_thinking_tokens)
        if allowed_tools is not None:
            options.allowed_tools = list(allowed_tools)
        if disallowed_tools is not None:
            options.disallowed_tools = list(disallowed_tools)
        if cwd is not None:
            options.cwd = cwd
        if append_system_prompt is not None:
            options.append_system_prompt = append_system_prompt
        if max_turns is not None:
            options.max_turns = max_turns
        if continue_conversation is not None:
            options.continue_conversation = bool(continue_conversation)
        if permission_mode is not None:
            options.permission_mode = permission_mode
    else:
        options = ClaudeCodeOptions()

    # Override the model (API parameter takes precedence)
    options.model = model

    # Apply system message if provided (this is supported by ClaudeCodeOptions)
    if system_message is not None:
        options.system_prompt = system_message

    # If session_id is provided via additional_options, enable continue_conversation
    # This ensures conversation continuity when using session IDs
    if additional_options.get("session_id"):
        options.continue_conversation = True

    # Note: temperature and max_tokens are API-level parameters, not ClaudeCodeOptions parameters
    # These are handled at the API request level, not in the options object

    # Handle additional options as needed
    for key, value in additional_options.items():
        if hasattr(options, key):
            setattr(options, key, value)

    return options

extract_system_message staticmethod

extract_system_message(messages)

Extract system message from Anthropic messages format.

Parameters:

Name Type Description Default
messages list[dict[str, Any]]

List of messages in Anthropic format

required

Returns:

Type Description
str | None

System message content if found, None otherwise

Source code in ccproxy/claude_sdk/options.py
@staticmethod
def extract_system_message(messages: list[dict[str, Any]]) -> str | None:
    """
    Extract system message from Anthropic messages format.

    Args:
        messages: List of messages in Anthropic format

    Returns:
        System message content if found, None otherwise
    """
    for message in messages:
        if message.get("role") == "system":
            content = message.get("content", "")
            if isinstance(content, list):
                # Handle content blocks
                text_parts = []
                for block in content:
                    if block.get("type") == "text":
                        text_parts.append(block.get("text", ""))
                return " ".join(text_parts)
            return str(content)
    return None

get_supported_models staticmethod

get_supported_models()

Get list of supported Claude models.

Returns:

Type Description
list[str]

List of supported model names

Source code in ccproxy/claude_sdk/options.py
@staticmethod
def get_supported_models() -> list[str]:
    """
    Get list of supported Claude models.

    Returns:
        List of supported model names
    """
    # Import here to avoid circular imports
    from ccproxy.utils.model_mapping import get_supported_claude_models

    # Get supported Claude models
    claude_models = get_supported_claude_models()
    return claude_models

validate_model staticmethod

validate_model(model)

Validate if a model is supported.

Parameters:

Name Type Description Default
model str

The model name to validate

required

Returns:

Type Description
bool

True if supported, False otherwise

Source code in ccproxy/claude_sdk/options.py
@staticmethod
def validate_model(model: str) -> bool:
    """
    Validate if a model is supported.

    Args:
        model: The model name to validate

    Returns:
        True if supported, False otherwise
    """
    return model in OptionsHandler.get_supported_models()

get_default_options staticmethod

get_default_options()

Get default options for API parameters.

Returns:

Type Description
dict[str, Any]

Dictionary of default API parameter values

Source code in ccproxy/claude_sdk/options.py
@staticmethod
def get_default_options() -> dict[str, Any]:
    """
    Get default options for API parameters.

    Returns:
        Dictionary of default API parameter values
    """
    return {
        "model": "claude-3-5-sonnet-20241022",
        "temperature": 0.7,
        "max_tokens": 4000,
    }

parse_formatted_sdk_content

parse_formatted_sdk_content(text, collect_tool_calls=False)

Parse XML-formatted SDK content from text blocks.

This is the main parsing function that handles all types of XML-formatted SDK content by applying the appropriate parsing functions in sequence.

Parameters:

Name Type Description Default
text str

Text content that may contain XML-formatted SDK data

required
collect_tool_calls bool

Whether to collect tool calls for OpenAI format conversion (used by OpenAI adapter, not by streaming processor)

False

Returns:

Type Description
str

Tuple of (cleaned_text, tool_calls_list)

list[Any]
  • cleaned_text: Text with XML-formatted SDK content converted to readable format
tuple[str, list[Any]]
  • tool_calls_list: List of tool calls (empty if collect_tool_calls=False)
Source code in ccproxy/claude_sdk/parser.py
def parse_formatted_sdk_content(
    text: str, collect_tool_calls: bool = False
) -> tuple[str, list[Any]]:
    """Parse XML-formatted SDK content from text blocks.

    This is the main parsing function that handles all types of XML-formatted
    SDK content by applying the appropriate parsing functions in sequence.

    Args:
        text: Text content that may contain XML-formatted SDK data
        collect_tool_calls: Whether to collect tool calls for OpenAI format conversion
                           (used by OpenAI adapter, not by streaming processor)

    Returns:
        Tuple of (cleaned_text, tool_calls_list)
        - cleaned_text: Text with XML-formatted SDK content converted to readable format
        - tool_calls_list: List of tool calls (empty if collect_tool_calls=False)
    """
    if not text:
        return text, []

    # Apply all parsing functions in sequence
    cleaned_text = text

    # Parse system messages
    cleaned_text = parse_system_message_tags(cleaned_text)

    # Parse tool use blocks (may collect tool calls)
    cleaned_text, tool_calls = parse_tool_use_sdk_tags(cleaned_text, collect_tool_calls)

    # Parse tool result blocks
    cleaned_text = parse_tool_result_sdk_tags(cleaned_text)

    # Parse result message blocks
    cleaned_text = parse_result_message_tags(cleaned_text)

    # Parse text tags (unwrap content) - do this last
    cleaned_text = parse_text_tags(cleaned_text)

    return cleaned_text.strip(), tool_calls