Skip to content

ccproxy.services.adapters.mock_adapter

ccproxy.services.adapters.mock_adapter

Mock adapter for bypass mode.

MockAdapter

MockAdapter(mock_handler)

Bases: BaseAdapter

Adapter for bypass/mock mode.

Source code in ccproxy/services/adapters/mock_adapter.py
def __init__(self, mock_handler: MockResponseHandler) -> None:
    self.mock_handler = mock_handler

cleanup async

cleanup()

Release adapter resources.

Source code in ccproxy/services/adapters/mock_adapter.py
async def cleanup(self) -> None:
    """Release adapter resources."""
    return None

handle_request async

handle_request(request)

Handle request using mock handler.

Source code in ccproxy/services/adapters/mock_adapter.py
async def handle_request(
    self, request: Request
) -> Response | StreamingResponse | DeferredStreaming:
    """Handle request using mock handler."""
    body = await request.body()
    message_type = self.mock_handler.extract_message_type(body)
    prompt_text = self.mock_handler.extract_prompt_text(body)

    # Get endpoint from context or request URL
    endpoint = request.url.path
    if hasattr(request.state, "context"):
        ctx = request.state.context
        endpoint = ctx.metadata.get("endpoint", request.url.path)

    target_format = self._resolve_target_format(request, endpoint)
    model = "unknown"
    try:
        body_json = json.loads(body) if body else {}
        model = body_json.get("model", "unknown")
    except json.JSONDecodeError:
        pass
    except UnicodeDecodeError:
        pass
    except Exception as e:
        logger.debug("stream_flag_extraction_error", error=str(e))

    # Create request context
    ctx = RequestContext(
        request_id="mock-request",
        start_time=time.perf_counter(),
        logger=structlog.get_logger(__name__),
    )

    if self._extract_stream_flag(body):
        return cast(
            StreamingResponse | DeferredStreaming,
            await self.mock_handler.generate_streaming_response(
                model, target_format, ctx, message_type, prompt_text
            ),
        )
    else:
        (
            status,
            headers,
            response_body,
        ) = await self.mock_handler.generate_standard_response(
            model, target_format, ctx, message_type, prompt_text
        )
        return Response(content=response_body, status_code=status, headers=headers)

handle_streaming async

handle_streaming(request, endpoint, **kwargs)

Handle a streaming request.

Source code in ccproxy/services/adapters/mock_adapter.py
async def handle_streaming(
    self, request: Request, endpoint: str, **kwargs: Any
) -> StreamingResponse:
    """Handle a streaming request."""
    body = await request.body()
    message_type = self.mock_handler.extract_message_type(body)
    prompt_text = self.mock_handler.extract_prompt_text(body)
    target_format = self._resolve_target_format(request, endpoint)
    model = "unknown"
    try:
        body_json = json.loads(body) if body else {}
        model = body_json.get("model", "unknown")
    except json.JSONDecodeError:
        pass
    except UnicodeDecodeError:
        pass
    except Exception as e:
        logger.debug("stream_flag_extraction_error", error=str(e))

    # Create request context
    ctx = RequestContext(
        request_id=kwargs.get("request_id", "mock-stream-request"),
        start_time=time.perf_counter(),
        logger=structlog.get_logger(__name__),
    )

    return cast(
        StreamingResponse,
        await self.mock_handler.generate_streaming_response(
            model, target_format, ctx, message_type, prompt_text
        ),
    )