Skip to content

ccproxy.api.routes.claude

ccproxy.api.routes.claude

Claude SDK endpoints for CCProxy API Server.

create_openai_chat_completion async

create_openai_chat_completion(
    openai_request, claude_service, request
)

Create a chat completion using Claude SDK with OpenAI-compatible format.

This endpoint handles OpenAI API format requests and converts them to Anthropic format before using the Claude SDK directly.

Source code in ccproxy/api/routes/claude.py
@router.post("/v1/chat/completions", response_model=None)
async def create_openai_chat_completion(
    openai_request: OpenAIChatCompletionRequest,
    claude_service: ClaudeServiceDep,
    request: Request,
) -> StreamingResponse | OpenAIChatCompletionResponse:
    """Create a chat completion using Claude SDK with OpenAI-compatible format.

    This endpoint handles OpenAI API format requests and converts them
    to Anthropic format before using the Claude SDK directly.
    """
    try:
        # Create adapter instance
        adapter = OpenAIAdapter()

        # Convert entire OpenAI request to Anthropic format using adapter
        anthropic_request = adapter.adapt_request(openai_request.model_dump())

        # Extract stream parameter
        stream = openai_request.stream or False

        # Get request context from middleware
        request_context = getattr(request.state, "context", None)

        if request_context is None:
            raise HTTPException(
                status_code=500, detail="Internal server error: no request context"
            )

        # Call Claude SDK service with adapted request
        response = await claude_service.create_completion(
            messages=anthropic_request["messages"],
            model=anthropic_request["model"],
            temperature=anthropic_request.get("temperature"),
            max_tokens=anthropic_request.get("max_tokens"),
            stream=stream,
            user_id=getattr(openai_request, "user", None),
            request_context=request_context,
        )

        if stream:
            # Handle streaming response
            async def openai_stream_generator() -> AsyncIterator[bytes]:
                # Use adapt_stream for streaming responses
                async for openai_chunk in adapter.adapt_stream(response):  # type: ignore[arg-type]
                    yield f"data: {json.dumps(openai_chunk)}\n\n".encode()
                # Send final chunk
                yield b"data: [DONE]\n\n"

            # Use unified streaming wrapper with logging
            return StreamingResponseWithLogging(
                content=openai_stream_generator(),
                request_context=request_context,
                metrics=getattr(claude_service, "metrics", None),
                status_code=200,
                media_type="text/event-stream",
                headers={
                    "Cache-Control": "no-cache",
                    "Connection": "keep-alive",
                },
            )
        else:
            # Convert non-streaming response to OpenAI format using adapter
            # Convert MessageResponse model to dict for adapter
            # In non-streaming mode, response should always be MessageResponse
            assert isinstance(response, MessageResponse), (
                "Non-streaming response must be MessageResponse"
            )
            response_dict = response.model_dump()
            openai_response = adapter.adapt_response(response_dict)
            return OpenAIChatCompletionResponse.model_validate(openai_response)

    except Exception as e:
        # Re-raise specific proxy errors to be handled by the error handler
        from ccproxy.core.errors import ClaudeProxyError

        if isinstance(e, ClaudeProxyError):
            raise
        raise HTTPException(
            status_code=500, detail=f"Internal server error: {str(e)}"
        ) from e

create_openai_chat_completion_with_session async

create_openai_chat_completion_with_session(
    session_id, openai_request, claude_service, request
)

Create a chat completion using Claude SDK with OpenAI-compatible format and session ID.

This endpoint handles OpenAI API format requests with session ID and converts them to Anthropic format before using the Claude SDK directly.

Source code in ccproxy/api/routes/claude.py
@router.post(
    "/{session_id}/v1/chat/completions",
    response_model=None,
)
async def create_openai_chat_completion_with_session(
    session_id: str,
    openai_request: OpenAIChatCompletionRequest,
    claude_service: ClaudeServiceDep,
    request: Request,
) -> StreamingResponse | OpenAIChatCompletionResponse:
    """Create a chat completion using Claude SDK with OpenAI-compatible format and session ID.

    This endpoint handles OpenAI API format requests with session ID and converts them
    to Anthropic format before using the Claude SDK directly.
    """
    try:
        # Create adapter instance
        adapter = OpenAIAdapter()

        # Convert entire OpenAI request to Anthropic format using adapter
        anthropic_request = adapter.adapt_request(openai_request.model_dump())

        # Extract stream parameter
        stream = openai_request.stream or False

        # Get request context from middleware
        request_context = getattr(request.state, "context", None)

        if request_context is None:
            raise HTTPException(
                status_code=500, detail="Internal server error: no request context"
            )

        # Call Claude SDK service with adapted request and session_id
        response = await claude_service.create_completion(
            messages=anthropic_request["messages"],
            model=anthropic_request["model"],
            temperature=anthropic_request.get("temperature"),
            max_tokens=anthropic_request.get("max_tokens"),
            stream=stream,
            user_id=getattr(openai_request, "user", None),
            session_id=session_id,
            request_context=request_context,
        )

        if stream:
            # Handle streaming response
            async def openai_stream_generator() -> AsyncIterator[bytes]:
                # Use adapt_stream for streaming responses
                async for openai_chunk in adapter.adapt_stream(response):  # type: ignore[arg-type]
                    yield f"data: {json.dumps(openai_chunk)}\n\n".encode()
                # Send final chunk
                yield b"data: [DONE]\n\n"

            # Use unified streaming wrapper with logging
            # Session interrupts are now handled directly by the StreamHandle
            return StreamingResponseWithLogging(
                content=openai_stream_generator(),
                request_context=request_context,
                metrics=getattr(claude_service, "metrics", None),
                status_code=200,
                media_type="text/event-stream",
                headers={
                    "Cache-Control": "no-cache",
                    "Connection": "keep-alive",
                },
            )
        else:
            # Convert non-streaming response to OpenAI format using adapter
            # Convert MessageResponse model to dict for adapter
            # In non-streaming mode, response should always be MessageResponse
            assert isinstance(response, MessageResponse), (
                "Non-streaming response must be MessageResponse"
            )
            response_dict = response.model_dump()
            openai_response = adapter.adapt_response(response_dict)
            return OpenAIChatCompletionResponse.model_validate(openai_response)

    except Exception as e:
        # Re-raise specific proxy errors to be handled by the error handler
        from ccproxy.core.errors import ClaudeProxyError

        if isinstance(e, ClaudeProxyError):
            raise
        raise HTTPException(
            status_code=500, detail=f"Internal server error: {str(e)}"
        ) from e

create_anthropic_message_with_session async

create_anthropic_message_with_session(
    session_id, message_request, claude_service, request
)

Create a message using Claude SDK with Anthropic format and session ID.

This endpoint handles Anthropic API format requests with session ID directly using the Claude SDK without any format conversion.

Source code in ccproxy/api/routes/claude.py
@router.post(
    "/{session_id}/v1/messages",
    response_model=None,
)
async def create_anthropic_message_with_session(
    session_id: str,
    message_request: MessageCreateParams,
    claude_service: ClaudeServiceDep,
    request: Request,
) -> StreamingResponse | MessageResponse:
    """Create a message using Claude SDK with Anthropic format and session ID.

    This endpoint handles Anthropic API format requests with session ID directly
    using the Claude SDK without any format conversion.
    """
    try:
        # Extract parameters from Anthropic request
        messages = [msg.model_dump() for msg in message_request.messages]
        model = message_request.model
        temperature = message_request.temperature
        max_tokens = message_request.max_tokens
        stream = message_request.stream or False

        # Get request context from middleware
        request_context = getattr(request.state, "context", None)
        if request_context is None:
            raise HTTPException(
                status_code=500, detail="Internal server error: no request context"
            )

        # Call Claude SDK service directly with Anthropic format and session_id
        response = await claude_service.create_completion(
            messages=messages,
            model=model,
            temperature=temperature,
            max_tokens=max_tokens,
            stream=stream,
            user_id=getattr(message_request, "user_id", None),
            session_id=session_id,
            request_context=request_context,
        )

        if stream:
            # Handle streaming response
            async def anthropic_stream_generator() -> AsyncIterator[bytes]:
                async for chunk in response:  # type: ignore[union-attr]
                    if chunk:
                        # All chunks from Claude SDK streaming should be dict format
                        # and need proper SSE event formatting
                        if isinstance(chunk, dict):
                            # Determine event type from chunk type
                            event_type = chunk.get("type", "message_delta")
                            yield f"event: {event_type}\n".encode()
                            yield f"data: {json.dumps(chunk)}\n\n".encode()
                        else:
                            # Fallback for unexpected format
                            yield f"data: {json.dumps(chunk)}\n\n".encode()
                # No final [DONE] chunk for Anthropic format

            # Use unified streaming wrapper with logging
            # Session interrupts are now handled directly by the StreamHandle
            return StreamingResponseWithLogging(
                content=anthropic_stream_generator(),
                request_context=request_context,
                metrics=getattr(claude_service, "metrics", None),
                status_code=200,
                media_type="text/event-stream",
                headers={
                    "Cache-Control": "no-cache",
                    "Connection": "keep-alive",
                },
            )
        else:
            # Return Anthropic format response directly
            return MessageResponse.model_validate(response)

    except Exception as e:
        # Re-raise specific proxy errors to be handled by the error handler
        from ccproxy.core.errors import ClaudeProxyError

        if isinstance(e, ClaudeProxyError):
            raise e
        raise HTTPException(
            status_code=500, detail=f"Internal server error: {str(e)}"
        ) from e

create_anthropic_message async

create_anthropic_message(
    message_request, claude_service, request
)

Create a message using Claude SDK with Anthropic format.

This endpoint handles Anthropic API format requests directly using the Claude SDK without any format conversion.

Source code in ccproxy/api/routes/claude.py
@router.post("/v1/messages", response_model=None)
async def create_anthropic_message(
    message_request: MessageCreateParams,
    claude_service: ClaudeServiceDep,
    request: Request,
) -> StreamingResponse | MessageResponse:
    """Create a message using Claude SDK with Anthropic format.

    This endpoint handles Anthropic API format requests directly
    using the Claude SDK without any format conversion.
    """
    try:
        # Extract parameters from Anthropic request
        messages = [msg.model_dump() for msg in message_request.messages]
        model = message_request.model
        temperature = message_request.temperature
        max_tokens = message_request.max_tokens
        stream = message_request.stream or False

        # Get request context from middleware
        request_context = getattr(request.state, "context", None)
        if request_context is None:
            raise HTTPException(
                status_code=500, detail="Internal server error: no request context"
            )

        # Extract session_id from metadata if present
        session_id = None
        if message_request.metadata:
            metadata_dict = message_request.metadata.model_dump()
            session_id = metadata_dict.get("session_id")

        # Call Claude SDK service directly with Anthropic format
        response = await claude_service.create_completion(
            messages=messages,
            model=model,
            temperature=temperature,
            max_tokens=max_tokens,
            stream=stream,
            user_id=getattr(message_request, "user_id", None),
            session_id=session_id,
            request_context=request_context,
        )

        if stream:
            # Handle streaming response
            async def anthropic_stream_generator() -> AsyncIterator[bytes]:
                async for chunk in response:  # type: ignore[union-attr]
                    if chunk:
                        # All chunks from Claude SDK streaming should be dict format
                        # and need proper SSE event formatting
                        if isinstance(chunk, dict):
                            # Determine event type from chunk type
                            event_type = chunk.get("type", "message_delta")
                            yield f"event: {event_type}\n".encode()
                            yield f"data: {json.dumps(chunk)}\n\n".encode()
                        else:
                            # Fallback for unexpected format
                            yield f"data: {json.dumps(chunk)}\n\n".encode()
                # No final [DONE] chunk for Anthropic format

            # Use unified streaming wrapper with logging for all requests
            # Session interrupts are now handled directly by the StreamHandle
            return StreamingResponseWithLogging(
                content=anthropic_stream_generator(),
                request_context=request_context,
                metrics=getattr(claude_service, "metrics", None),
                status_code=200,
                media_type="text/event-stream",
                headers={
                    "Cache-Control": "no-cache",
                    "Connection": "keep-alive",
                },
            )
        else:
            # Return Anthropic format response directly
            return MessageResponse.model_validate(response)

    except Exception as e:
        # Re-raise specific proxy errors to be handled by the error handler
        from ccproxy.core.errors import ClaudeProxyError

        if isinstance(e, ClaudeProxyError):
            raise e
        raise HTTPException(
            status_code=500, detail=f"Internal server error: {str(e)}"
        ) from e