Skip to content

ccproxy.plugins.claude_sdk.adapter

ccproxy.plugins.claude_sdk.adapter

Claude SDK adapter implementation using delegation pattern.

ClaudeSDKAdapter

ClaudeSDKAdapter(
    config,
    session_manager=None,
    metrics=None,
    hook_manager=None,
    **kwargs,
)

Bases: BaseHTTPAdapter

Claude SDK adapter implementation using delegation pattern.

This adapter integrates with the application request lifecycle, following the same pattern as claude_api and codex plugins.

Parameters:

Name Type Description Default
config ClaudeSDKSettings

SDK configuration settings

required
session_manager SessionManager | None

Optional session manager for session handling

None
metrics IMetricsCollector | None

Optional metrics collector

None
hook_manager Any | None

Optional hook manager for emitting events

None
Source code in ccproxy/plugins/claude_sdk/adapter.py
def __init__(
    self,
    config: ClaudeSDKSettings,
    # Optional dependencies
    session_manager: SessionManager | None = None,
    metrics: "IMetricsCollector | None" = None,
    hook_manager: Any | None = None,
    **kwargs: Any,
) -> None:
    """Initialize the Claude SDK adapter with explicit dependencies.

    Args:
        config: SDK configuration settings
        session_manager: Optional session manager for session handling
        metrics: Optional metrics collector
        hook_manager: Optional hook manager for emitting events
    """
    # Initialize BaseHTTPAdapter with dummy auth_manager and http_pool_manager
    # since ClaudeSDK doesn't use external HTTP
    super().__init__(
        config=config, auth_manager=None, http_pool_manager=None, **kwargs
    )
    self.metrics = metrics
    self.hook_manager = hook_manager

    # Generate or set default session ID
    self._runtime_default_session_id = None
    if (
        config.auto_generate_default_session
        and config.sdk_session_pool
        and config.sdk_session_pool.enabled
    ):
        # Generate a random session ID for this runtime
        self._runtime_default_session_id = f"auto-{uuid.uuid4().hex[:12]}"
        logger.debug(
            "auto_generated_session",
            session_id=self._runtime_default_session_id,
            lifetime="runtime",
        )
    elif config.default_session_id:
        self._runtime_default_session_id = config.default_session_id
        logger.debug(
            "using_configured_default_session",
            session_id=self._runtime_default_session_id,
        )

    # Use provided session_manager or create if needed and enabled
    if (
        session_manager is None
        and config.sdk_session_pool
        and config.sdk_session_pool.enabled
    ):
        session_manager = SessionManager(config=config)
        logger.debug(
            "adapter_session_pool_enabled",
            session_ttl=config.sdk_session_pool.session_ttl,
            max_sessions=config.sdk_session_pool.max_sessions,
            has_default_session=bool(self._runtime_default_session_id),
            auto_generated=config.auto_generate_default_session,
        )

    self.session_manager = session_manager
    self.handler: ClaudeSDKHandler | None = ClaudeSDKHandler(
        config=config,
        session_manager=session_manager,
        hook_manager=hook_manager,
    )
    self.auth_manager = NoOpAuthManager()
    self._detection_service: Any | None = None
    self._initialized = False
    self._format_adapter_cache: dict[tuple[str, ...], FormatAdapterProtocol] = {}

initialize async

initialize()

Initialize the adapter and start session manager if needed.

Source code in ccproxy/plugins/claude_sdk/adapter.py
async def initialize(self) -> None:
    """Initialize the adapter and start session manager if needed."""
    if not self._initialized:
        if self.session_manager:
            await self.session_manager.start()
            logger.debug("session_manager_started")
        self._initialized = True

set_detection_service

set_detection_service(detection_service)

Set the detection service.

Parameters:

Name Type Description Default
detection_service Any

Claude CLI detection service

required
Source code in ccproxy/plugins/claude_sdk/adapter.py
def set_detection_service(self, detection_service: Any) -> None:
    """Set the detection service.

    Args:
        detection_service: Claude CLI detection service
    """
    self._detection_service = detection_service

handle_streaming async

handle_streaming(request, endpoint, **kwargs)

Handle a streaming request through Claude SDK.

This is a convenience method that ensures stream=true and delegates to handle_request which handles both streaming and non-streaming.

Parameters:

Name Type Description Default
request Request

FastAPI request object

required
endpoint str

Target endpoint path

required
**kwargs Any

Additional arguments

{}

Returns:

Type Description
StreamingResponse

Streaming response from Claude SDK

Source code in ccproxy/plugins/claude_sdk/adapter.py
async def handle_streaming(
    self, request: Request, endpoint: str, **kwargs: Any
) -> StreamingResponse:
    """Handle a streaming request through Claude SDK.

    This is a convenience method that ensures stream=true and delegates
    to handle_request which handles both streaming and non-streaming.

    Args:
        request: FastAPI request object
        endpoint: Target endpoint path
        **kwargs: Additional arguments

    Returns:
        Streaming response from Claude SDK
    """
    if not self._initialized:
        await self.initialize()

    # Parse and modify request to ensure stream=true
    body = await request.body()
    if not body:
        request_data = {"stream": True}
    else:
        try:
            request_data = json.loads(body)
        except json.JSONDecodeError:
            request_data = {"stream": True}

    # Force streaming
    request_data["stream"] = True
    modified_body = json.dumps(request_data).encode()

    # Create modified request with stream=true
    modified_scope = {
        **request.scope,
        "_body": modified_body,
    }

    modified_request = StarletteRequest(
        scope=modified_scope,
        receive=request.receive,
    )
    modified_request._body = modified_body

    # Delegate to handle_request which will handle streaming
    result = await self.handle_request(modified_request)

    # Ensure we return a streaming response
    if not isinstance(result, StreamingResponse):
        # This shouldn't happen since we forced stream=true, but handle it gracefully
        logger.warning(
            "unexpected_response_type",
            expected="StreamingResponse",
            actual=type(result).__name__,
        )
        return StreamingResponse(
            iter([result.body if hasattr(result, "body") else b""]),
            media_type="text/event-stream",
            headers={"X-Claude-SDK-Response": "true"},
        )

    return result

cleanup async

cleanup()

Cleanup resources when shutting down.

Source code in ccproxy/plugins/claude_sdk/adapter.py
async def cleanup(self) -> None:
    """Cleanup resources when shutting down."""
    try:
        # Shutdown session manager first
        if self.session_manager:
            await self.session_manager.shutdown()
            self.session_manager = None

        # Close handler
        if self.handler:
            await self.handler.close()
            self.handler = None

        # Clear references to prevent memory leaks
        self._detection_service = None

        # Mark as not initialized
        self._initialized = False

        logger.debug("adapter_cleanup_completed")

    except Exception as e:
        logger.error(
            "adapter_cleanup_failed",
            error=str(e),
            exc_info=e,
        )

close async

close()

Compatibility method - delegates to cleanup().

Source code in ccproxy/plugins/claude_sdk/adapter.py
async def close(self) -> None:
    """Compatibility method - delegates to cleanup()."""
    await self.cleanup()

prepare_provider_request async

prepare_provider_request(body, headers, endpoint)

Prepare request for ClaudeSDK (minimal implementation).

ClaudeSDK uses the local Claude SDK rather than making HTTP requests, so this just passes through the body and headers.

Source code in ccproxy/plugins/claude_sdk/adapter.py
async def prepare_provider_request(
    self, body: bytes, headers: dict[str, str], endpoint: str
) -> tuple[bytes, dict[str, str]]:
    """Prepare request for ClaudeSDK (minimal implementation).

    ClaudeSDK uses the local Claude SDK rather than making HTTP requests,
    so this just passes through the body and headers.
    """
    return body, headers

process_provider_response async

process_provider_response(response, endpoint)

Process response from ClaudeSDK (minimal implementation).

ClaudeSDK handles response processing in handle_request method, so this should not be called in normal operation.

Source code in ccproxy/plugins/claude_sdk/adapter.py
async def process_provider_response(
    self, response: "httpx.Response", endpoint: str
) -> Response | StreamingResponse:
    """Process response from ClaudeSDK (minimal implementation).

    ClaudeSDK handles response processing in handle_request method,
    so this should not be called in normal operation.
    """
    # This shouldn't be called for ClaudeSDK, but provide a fallback
    return Response(
        content=response.content,
        status_code=response.status_code,
        headers=dict(response.headers),
    )

get_target_url async

get_target_url(endpoint)

Get target URL for ClaudeSDK (minimal implementation).

ClaudeSDK uses local SDK rather than HTTP URLs, so this returns a placeholder URL.

Source code in ccproxy/plugins/claude_sdk/adapter.py
async def get_target_url(self, endpoint: str) -> str:
    """Get target URL for ClaudeSDK (minimal implementation).

    ClaudeSDK uses local SDK rather than HTTP URLs,
    so this returns a placeholder URL.
    """
    return f"claude-sdk://local/{endpoint.lstrip('/')}"