Skip to content

ccproxy.services.claude_sdk_service

ccproxy.services.claude_sdk_service

Claude SDK service orchestration for business logic.

ClaudeSDKService

ClaudeSDKService(
    sdk_client=None,
    auth_manager=None,
    metrics=None,
    settings=None,
    session_manager=None,
)

Service layer for Claude SDK operations orchestration.

This class handles business logic coordination between the pure SDK client, authentication, metrics, and format conversion while maintaining clean separation of concerns.

Parameters:

Name Type Description Default
sdk_client ClaudeSDKClient | None

Claude SDK client instance

None
auth_manager AuthManager | None

Authentication manager (optional)

None
metrics PrometheusMetrics | None

Prometheus metrics instance (optional)

None
settings Settings | None

Application settings (optional)

None
session_manager SessionManager | None

Session manager for dependency injection (optional)

None
Source code in ccproxy/services/claude_sdk_service.py
def __init__(
    self,
    sdk_client: ClaudeSDKClient | None = None,
    auth_manager: AuthManager | None = None,
    metrics: PrometheusMetrics | None = None,
    settings: Settings | None = None,
    session_manager: SessionManager | None = None,
) -> None:
    """
    Initialize Claude SDK service.

    Args:
        sdk_client: Claude SDK client instance
        auth_manager: Authentication manager (optional)
        metrics: Prometheus metrics instance (optional)
        settings: Application settings (optional)
        session_manager: Session manager for dependency injection (optional)
    """
    self.sdk_client = sdk_client or ClaudeSDKClient(
        settings=settings, session_manager=session_manager
    )
    self.auth_manager = auth_manager
    self.metrics = metrics
    self.settings = settings
    self.message_converter = MessageConverter()
    self.options_handler = OptionsHandler(settings=settings)
    self.stream_processor = ClaudeStreamProcessor(
        message_converter=self.message_converter,
        metrics=self.metrics,
    )

create_completion async

create_completion(
    request_context,
    messages,
    model,
    temperature=None,
    max_tokens=None,
    stream=False,
    session_id=None,
    **kwargs,
)

Create a completion using Claude SDK with business logic orchestration.

Parameters:

Name Type Description Default
messages list[dict[str, Any]]

List of messages in Anthropic format

required
model str

The model to use

required
temperature float | None

Temperature for response generation

None
max_tokens int | None

Maximum tokens in response

None
stream bool

Whether to stream responses

False
session_id str | None

Optional session ID for Claude SDK integration

None
request_context RequestContext

Existing request context to use instead of creating new one

required
**kwargs Any

Additional arguments

{}

Returns:

Type Description
MessageResponse | AsyncIterator[dict[str, Any]]

Response dict or async iterator of response chunks if streaming

Raises:

Type Description
ClaudeProxyError

If request fails

ServiceUnavailableError

If service is unavailable

Source code in ccproxy/services/claude_sdk_service.py
async def create_completion(
    self,
    request_context: RequestContext,
    messages: list[dict[str, Any]],
    model: str,
    temperature: float | None = None,
    max_tokens: int | None = None,
    stream: bool = False,
    session_id: str | None = None,
    **kwargs: Any,
) -> MessageResponse | AsyncIterator[dict[str, Any]]:
    """
    Create a completion using Claude SDK with business logic orchestration.

    Args:
        messages: List of messages in Anthropic format
        model: The model to use
        temperature: Temperature for response generation
        max_tokens: Maximum tokens in response
        stream: Whether to stream responses
        session_id: Optional session ID for Claude SDK integration
        request_context: Existing request context to use instead of creating new one
        **kwargs: Additional arguments

    Returns:
        Response dict or async iterator of response chunks if streaming

    Raises:
        ClaudeProxyError: If request fails
        ServiceUnavailableError: If service is unavailable
    """

    # Extract system message and create options
    system_message = self.options_handler.extract_system_message(messages)

    # Map model to Claude model
    model = map_model_to_claude(model)

    options = self.options_handler.create_options(
        model=model,
        temperature=temperature,
        max_tokens=max_tokens,
        system_message=system_message,
        session_id=session_id,
        **kwargs,
    )

    # Messages will be converted to SDK format in the client layer

    # Use existing context, but update metadata for this service (preserve original service_type)
    ctx = request_context
    metadata = {
        "endpoint": "messages",
        "model": model,
        "streaming": stream,
    }
    if session_id:
        metadata["session_id"] = session_id
    ctx.add_metadata(**metadata)
    # Use existing request ID from context
    request_id = ctx.request_id

    try:
        # Log SDK request parameters
        timestamp = ctx.get_log_timestamp_prefix() if ctx else None
        await self._log_sdk_request(
            request_id, messages, options, model, stream, session_id, timestamp
        )

        if stream:
            # For streaming, return the async iterator directly
            # Access logging will be handled by the stream processor when ResultMessage is received
            return self._stream_completion(
                ctx, messages, options, model, session_id, timestamp
            )
        else:
            result = await self._complete_non_streaming(
                ctx, messages, options, model, session_id, timestamp
            )
            return result
    except (ClaudeProxyError, ServiceUnavailableError) as e:
        # Add error info to context for automatic access logging
        ctx.add_metadata(error_message=str(e), error_type=type(e).__name__)
        raise

validate_health async

validate_health()

Validate that the service is healthy.

Returns:

Type Description
bool

True if healthy, False otherwise

Source code in ccproxy/services/claude_sdk_service.py
async def validate_health(self) -> bool:
    """
    Validate that the service is healthy.

    Returns:
        True if healthy, False otherwise
    """
    try:
        return await self.sdk_client.validate_health()
    except Exception as e:
        logger.error(
            "health_check_failed",
            error=str(e),
            error_type=type(e).__name__,
            exc_info=True,
        )
        return False

interrupt_session async

interrupt_session(session_id)

Interrupt a Claude session due to client disconnection.

Parameters:

Name Type Description Default
session_id str

The session ID to interrupt

required

Returns:

Type Description
bool

True if session was found and interrupted, False otherwise

Source code in ccproxy/services/claude_sdk_service.py
async def interrupt_session(self, session_id: str) -> bool:
    """Interrupt a Claude session due to client disconnection.

    Args:
        session_id: The session ID to interrupt

    Returns:
        True if session was found and interrupted, False otherwise
    """
    return await self.sdk_client.interrupt_session(session_id)

close async

close()

Close the service and cleanup resources.

Source code in ccproxy/services/claude_sdk_service.py
async def close(self) -> None:
    """Close the service and cleanup resources."""
    await self.sdk_client.close()