Skip to content

ccproxy.services.adapters.mock_adapter

ccproxy.services.adapters.mock_adapter

Mock adapter for bypass mode.

MockAdapter

MockAdapter(mock_handler)

Bases: BaseAdapter

Adapter for bypass/mock mode.

Source code in ccproxy/services/adapters/mock_adapter.py
def __init__(self, mock_handler: MockResponseHandler) -> None:
    self.mock_handler = mock_handler

handle_request async

handle_request(request)

Handle request using mock handler.

Source code in ccproxy/services/adapters/mock_adapter.py
async def handle_request(
    self, request: Request
) -> Response | StreamingResponse | DeferredStreaming:
    """Handle request using mock handler."""
    body = await request.body()
    message_type = self.mock_handler.extract_message_type(body)

    # Get endpoint from context or request URL
    endpoint = request.url.path
    if hasattr(request.state, "context"):
        ctx = request.state.context
        endpoint = ctx.metadata.get("endpoint", request.url.path)

    is_openai = "openai" in endpoint
    model = "unknown"
    try:
        body_json = json.loads(body) if body else {}
        model = body_json.get("model", "unknown")
    except json.JSONDecodeError:
        pass
    except UnicodeDecodeError:
        pass
    except Exception as e:
        logger.debug("stream_flag_extraction_error", error=str(e))
        pass

    # Create request context
    ctx = RequestContext(
        request_id="mock-request",
        start_time=time.perf_counter(),
        logger=structlog.get_logger(__name__),
    )

    if self._extract_stream_flag(body):
        return await self.mock_handler.generate_streaming_response(
            model, is_openai, ctx, message_type
        )
    else:
        (
            status,
            headers,
            response_body,
        ) = await self.mock_handler.generate_standard_response(
            model, is_openai, ctx, message_type
        )
        return Response(content=response_body, status_code=status, headers=headers)

handle_streaming async

handle_streaming(request, endpoint, **kwargs)

Handle a streaming request.

Source code in ccproxy/services/adapters/mock_adapter.py
async def handle_streaming(
    self, request: Request, endpoint: str, **kwargs: Any
) -> StreamingResponse:
    """Handle a streaming request."""
    body = await request.body()
    message_type = self.mock_handler.extract_message_type(body)
    is_openai = "openai" in endpoint
    model = "unknown"
    try:
        body_json = json.loads(body) if body else {}
        model = body_json.get("model", "unknown")
    except json.JSONDecodeError:
        pass
    except UnicodeDecodeError:
        pass
    except Exception as e:
        logger.debug("stream_flag_extraction_error", error=str(e))
        pass

    # Create request context
    ctx = RequestContext(
        request_id=kwargs.get("request_id", "mock-stream-request"),
        start_time=time.perf_counter(),
        logger=structlog.get_logger(__name__),
    )

    return await self.mock_handler.generate_streaming_response(
        model, is_openai, ctx, message_type
    )