Skip to content

ccproxy.testing.endpoints

ccproxy.testing.endpoints

Endpoint testing helpers for CCProxy.

EndpointRequestResult dataclass

EndpointRequestResult(
    phase, method, status_code, stream, details=dict()
)

Outcome of a single HTTP request made while executing a test.

EndpointTest dataclass

EndpointTest(
    name, endpoint, stream, request, model, description=""
)

Configuration for a single endpoint test.

EndpointTestResult dataclass

EndpointTestResult(
    test,
    index,
    success,
    error=None,
    exception=None,
    request_results=list(),
)

Result of running a single endpoint test.

name property

name

Convenience access to the test name.

EndpointTestRunSummary dataclass

EndpointTestRunSummary(
    base_url,
    results,
    successful_count,
    failure_count,
    errors=list(),
)

Summary of executing a batch of endpoint tests.

total property

total

Total number of executed tests.

failed_results property

failed_results

Return the list of failed test results.

all_passed

all_passed()

Return True when every executed test succeeded.

Source code in ccproxy/testing/endpoints/models.py
def all_passed(self) -> bool:
    """Return True when every executed test succeeded."""
    return self.failure_count == 0 and not self.errors

assert_success

assert_success()

Raise AssertionError if any test failed (useful for pytest).

Source code in ccproxy/testing/endpoints/models.py
def assert_success(self) -> None:
    """Raise AssertionError if any test failed (useful for pytest)."""
    if self.all_passed():
        return

    failed_names = ", ".join(result.name for result in self.failed_results)
    parts = []
    if self.failure_count:
        parts.append(
            f"{self.failure_count} endpoint test(s) failed: {failed_names}"
        )
    if self.errors:
        parts.append("; additional errors: " + "; ".join(self.errors))

    raise AssertionError(" ".join(parts) if parts else "Endpoint test run failed")

TestEndpoint

TestEndpoint(
    base_url="http://127.0.0.1:8000",
    trace=False,
    *,
    cors_origin=None,
    default_headers=None,
    client=None,
)

Test endpoint utility for CCProxy API testing.

Source code in ccproxy/testing/endpoints/runner.py
def __init__(
    self,
    base_url: str = "http://127.0.0.1:8000",
    trace: bool = False,
    *,
    cors_origin: str | None = None,
    default_headers: dict[str, str] | None = None,
    client: httpx.AsyncClient | None = None,
):
    self.base_url = base_url
    self.trace = trace
    self.cors_origin = cors_origin
    self.base_headers: dict[str, str] = {"Accept-Encoding": "identity"}

    if default_headers:
        self.base_headers.update(default_headers)

    if self.cors_origin:
        self.base_headers["Origin"] = self.cors_origin

    if client is None:
        self.client = httpx.AsyncClient(
            timeout=30.0,
            headers=self.base_headers.copy(),
        )
    else:
        self.client = client
        # Ensure client carries required defaults without overwriting explicit values
        for key, value in self.base_headers.items():
            if key not in self.client.headers:
                self.client.headers[key] = value

extract_and_display_request_id

extract_and_display_request_id(headers, context=None)

Extract request ID from response headers and display it.

Source code in ccproxy/testing/endpoints/runner.py
def extract_and_display_request_id(
    self,
    headers: dict[str, Any],
    context: dict[str, Any] | None = None,
) -> str | None:
    """Extract request ID from response headers and display it."""
    request_id_headers = [
        "x-request-id",
        "request-id",
        "x-amzn-requestid",
        "x-correlation-id",
        "x-trace-id",
        "traceparent",
    ]

    request_id = None
    context_data = context or {}
    for header_name in request_id_headers:
        for key in [header_name, header_name.lower()]:
            if key in headers:
                request_id = headers[key]
                break
        if request_id:
            break

    if request_id:
        print(colored_info(f"-> Request ID: {request_id}"))
        logger.info(
            "Request ID extracted",
            request_id=request_id,
            **context_data,
        )
    else:
        logger.debug(
            "No request ID found in headers",
            available_headers=list(headers.keys()),
            **context_data,
        )

    return request_id

post_json async

post_json(
    url: str,
    payload: dict[str, Any],
    *,
    context: dict[str, Any] | None = None,
    headers: dict[str, str] | None = None,
    capture_result: Literal[False] = False,
) -> dict[str, Any]
post_json(
    url: str,
    payload: dict[str, Any],
    *,
    context: dict[str, Any] | None = None,
    headers: dict[str, str] | None = None,
    capture_result: Literal[True],
) -> tuple[dict[str, Any], EndpointRequestResult]
post_json(
    url,
    payload,
    *,
    context=None,
    headers=None,
    capture_result=False,
)

Post JSON request and return parsed response.

Source code in ccproxy/testing/endpoints/runner.py
async def post_json(
    self,
    url: str,
    payload: dict[str, Any],
    *,
    context: dict[str, Any] | None = None,
    headers: dict[str, str] | None = None,
    capture_result: bool = False,
) -> dict[str, Any] | tuple[dict[str, Any], EndpointRequestResult]:
    """Post JSON request and return parsed response."""
    request_headers = self._build_headers({"Content-Type": "application/json"})
    if headers:
        request_headers.update(headers)

    context_data = context or {}

    print(colored_info(f"-> Making JSON request to {url}"))
    logger.info(
        "Making JSON request",
        url=url,
        payload_model=payload.get("model"),
        payload_stream=payload.get("stream"),
        **context_data,
    )

    response = await self.client.post(url, json=payload, headers=request_headers)

    logger.info(
        "Received JSON response",
        status_code=response.status_code,
        headers=dict(response.headers),
        **context_data,
    )

    self.extract_and_display_request_id(
        dict(response.headers), context=context_data
    )

    status_code = response.status_code
    response_headers = dict(response.headers)

    parsed_body: dict[str, Any]
    if status_code != 200:
        print(colored_error(f"[ERROR] Request failed: HTTP {status_code}"))
        logger.error(
            "Request failed",
            status_code=status_code,
            response_text=response.text,
            **context_data,
        )
        parsed_body = {"error": f"HTTP {status_code}: {response.text}"}
    else:
        try:
            json_response = response.json()
        except json.JSONDecodeError as exc:  # noqa: TRY003
            logger.error(
                "Failed to parse JSON response",
                error=str(exc),
                **context_data,
            )
            parsed_body = {"error": f"JSON decode error: {exc}"}
        else:
            parsed_body = json_response

    request_result_details: dict[str, Any] = {
        "headers": response_headers,
    }
    if isinstance(parsed_body, dict):
        request_result_details["response"] = parsed_body
        error_detail = parsed_body.get("error")
        if error_detail:
            request_result_details["error_detail"] = error_detail
    else:
        request_result_details["response"] = parsed_body

    request_result = EndpointRequestResult(
        phase=context_data.get("phase", "initial"),
        method="POST",
        status_code=status_code,
        stream=False,
        details=request_result_details,
    )

    if capture_result:
        return parsed_body, request_result

    return parsed_body

post_stream async

post_stream(url, payload, *, context=None, headers=None)

Post streaming request and return list of SSE events.

Source code in ccproxy/testing/endpoints/runner.py
async def post_stream(
    self,
    url: str,
    payload: dict[str, Any],
    *,
    context: dict[str, Any] | None = None,
    headers: dict[str, str] | None = None,
) -> tuple[list[str], list[EndpointRequestResult]]:
    """Post streaming request and return list of SSE events."""
    request_headers = self._build_headers(
        {"Accept": "text/event-stream", "Content-Type": "application/json"}
    )
    if headers:
        request_headers.update(headers)

    context_data = context or {}

    print(colored_info(f"-> Making streaming request to {url}"))
    logger.info(
        "Making streaming request",
        url=url,
        payload_model=payload.get("model"),
        payload_stream=payload.get("stream"),
        **context_data,
    )

    events: list[str] = []
    raw_chunks: list[str] = []
    request_results: list[EndpointRequestResult] = []
    fallback_request_result: EndpointRequestResult | None = None
    fallback_used = False
    stream_status_code: int | None = None
    primary_event_count = 0

    try:
        async with self.client.stream(
            "POST", url, json=payload, headers=request_headers
        ) as resp:
            logger.info(
                "Streaming response received",
                status_code=resp.status_code,
                headers=dict(resp.headers),
                **context_data,
            )

            self.extract_and_display_request_id(
                dict(resp.headers), context=context_data
            )

            stream_status_code = resp.status_code

            if resp.status_code != 200:
                error_text = await resp.aread()
                error_message = error_text.decode()
                print(
                    colored_error(
                        f"[ERROR] Streaming request failed: HTTP {resp.status_code}"
                    )
                )
                logger.error(
                    "Streaming request failed",
                    status_code=resp.status_code,
                    response_text=error_message,
                    **context_data,
                )
                error_payload = json.dumps(
                    {
                        "error": {
                            "status": resp.status_code,
                            "message": error_message,
                        }
                    },
                    ensure_ascii=False,
                )
                events = [f"data: {error_payload}", "data: [DONE]"]
                request_results.append(
                    EndpointRequestResult(
                        phase=context_data.get("phase", "initial"),
                        method="POST",
                        status_code=stream_status_code,
                        stream=True,
                        details={
                            "event_count": len(events),
                            "error_detail": error_message,
                        },
                    )
                )
                return events, request_results

            buffer = ""
            async for chunk in resp.aiter_text():
                if not chunk:
                    continue

                # normalized_segments = self._normalize_stream_chunk(chunk)

                for segment in chunk:  # normalized_segments:
                    if not segment:
                        continue

                    raw_chunks.append(segment)
                    buffer += segment

                    while "\n\n" in buffer:
                        raw_event, buffer = buffer.split("\n\n", 1)
                        if raw_event.strip():
                            events.append(raw_event.strip())

            if buffer.strip():
                events.append(buffer.strip())

    except Exception as exc:  # noqa: BLE001
        logger.error(
            "Streaming request exception",
            error=str(exc),
            **context_data,
        )
        error_payload = json.dumps(
            {"error": {"message": str(exc)}}, ensure_ascii=False
        )
        events.append(f"data: {error_payload}")
        events.append("data: [DONE]")
        request_results.append(
            EndpointRequestResult(
                phase=context_data.get("phase", "initial"),
                method="POST",
                status_code=stream_status_code,
                stream=True,
                details={
                    "event_count": len(events),
                    "error_detail": str(exc),
                },
            )
        )
        return events, request_results

    raw_text = "".join(raw_chunks).strip()
    primary_event_count = len(events)
    only_done = events and all(
        evt.strip().lower() == "data: [done]" for evt in events
    )

    if not events or only_done:
        logger.debug(
            "stream_response_empty",
            event_count=len(events),
            raw_length=len(raw_text),
            **context_data,
        )

        fallback_events: list[str] | None = None

        if raw_text and raw_text.lower() != "data: [done]":
            if raw_text.startswith("data:"):
                fallback_events = [raw_text, "data: [DONE]"]
            else:
                fallback_events = [f"data: {raw_text}", "data: [DONE]"]
        else:
            (
                fallback_events,
                fallback_request_result,
            ) = await self._fallback_stream_to_json(
                url=url,
                payload=payload,
                context=context_data,
            )

        if fallback_events:
            logger.info(
                "stream_fallback_applied",
                fallback_event_count=len(fallback_events),
                **context_data,
            )
            events = fallback_events
            fallback_used = True

    events = [evt.rstrip("'\"") if isinstance(evt, str) else evt for evt in events]

    request_details: dict[str, Any] = {
        "event_count": len(events),
    }
    if fallback_used:
        request_details["fallback_applied"] = True
    if primary_event_count and primary_event_count != len(events):
        request_details["primary_event_count"] = primary_event_count
    if raw_text:
        request_details["raw_preview"] = raw_text[:120]

    request_results.append(
        EndpointRequestResult(
            phase=context_data.get("phase", "initial"),
            method="POST",
            status_code=stream_status_code,
            stream=True,
            details=request_details,
        )
    )

    if fallback_request_result is not None:
        request_results.append(fallback_request_result)

    logger.info(
        "Streaming completed",
        event_count=len(events),
        **context_data,
    )
    return events, request_results

options_preflight async

options_preflight(
    url,
    *,
    request_method="POST",
    request_headers=None,
    headers=None,
    context=None,
)

Send a CORS preflight OPTIONS request and return status and headers.

Source code in ccproxy/testing/endpoints/runner.py
async def options_preflight(
    self,
    url: str,
    *,
    request_method: str = "POST",
    request_headers: Sequence[str] | None = None,
    headers: dict[str, str] | None = None,
    context: dict[str, Any] | None = None,
) -> tuple[int, dict[str, Any]]:
    """Send a CORS preflight OPTIONS request and return status and headers."""

    preflight_headers = self._build_headers({})
    preflight_headers["Access-Control-Request-Method"] = request_method
    if request_headers:
        preflight_headers["Access-Control-Request-Headers"] = ", ".join(
            request_headers
        )
    if headers:
        preflight_headers.update(headers)

    context_data = context or {}

    print(colored_info(f"-> Making CORS preflight request to {url}"))
    logger.info(
        "Making CORS preflight request",
        url=url,
        request_method=request_method,
        request_headers=request_headers,
        **context_data,
    )

    response = await self.client.options(url, headers=preflight_headers)
    status_code = response.status_code
    response_headers = dict(response.headers)

    logger.info(
        "Preflight response received",
        status_code=status_code,
        headers=response_headers,
        **context_data,
    )

    self.extract_and_display_request_id(response_headers, context=context_data)
    print(colored_info(f"-> Preflight response status: HTTP {status_code}"))
    return status_code, response_headers

validate_response

validate_response(
    response, model_class, *, is_streaming=False
)

Validate response using the provided model_class.

Source code in ccproxy/testing/endpoints/runner.py
def validate_response(
    self, response: dict[str, Any], model_class: Any, *, is_streaming: bool = False
) -> bool:
    """Validate response using the provided model_class."""
    try:
        payload = response
        if model_class is ResponseMessage:
            payload = self._extract_openai_responses_message(response)
        elif model_class is ResponseObject and isinstance(payload.get("text"), str):
            try:
                payload = payload.copy()
                payload["text"] = json.loads(payload["text"])
            except json.JSONDecodeError:
                logger.debug(
                    "Failed to decode response.text as JSON",
                    text_value=payload.get("text"),
                )
        model_class.model_validate(payload)
        print(colored_success(f"[OK] {model_class.__name__} validation passed"))
        logger.info(f"{model_class.__name__} validation passed")
        return True
    except Exception as exc:  # noqa: BLE001
        print(
            colored_error(
                f"[ERROR] {model_class.__name__} validation failed: {exc}"
            )
        )
        logger.error(f"{model_class.__name__} validation failed", error=str(exc))
        return False

validate_sse_event

validate_sse_event(event)

Validate SSE event structure (basic check).

Source code in ccproxy/testing/endpoints/runner.py
def validate_sse_event(self, event: str) -> bool:
    """Validate SSE event structure (basic check)."""
    return event.startswith("data: ")

handle_tool_calls_in_response

handle_tool_calls_in_response(response, *, context=None)

Handle tool calls in a response and return modified response and tool results.

Source code in ccproxy/testing/endpoints/runner.py
def handle_tool_calls_in_response(
    self,
    response: dict[str, Any],
    *,
    context: dict[str, Any] | None = None,
) -> tuple[dict[str, Any], list[dict[str, Any]]]:
    """Handle tool calls in a response and return modified response and tool results."""
    tool_results: list[dict[str, Any]] = []
    context_data = context or {}

    if "choices" in response:
        for choice in response.get("choices", []):
            message = choice.get("message", {})
            if message.get("tool_calls"):
                print(colored_info("-> Tool calls detected in response"))
                logger.info(
                    "Tool calls detected in response",
                    tool_call_count=len(message["tool_calls"]),
                    **context_data,
                )
                for tool_call in message["tool_calls"]:
                    tool_name = tool_call["function"]["name"]
                    tool_input = json.loads(tool_call["function"]["arguments"])
                    print(colored_info(f"-> Calling tool: {tool_name}"))
                    print(
                        colored_info(
                            f"-> Tool input: {json.dumps(tool_input, indent=2)}"
                        )
                    )

                    logger.info(
                        "Executing tool call",
                        tool_name=tool_name,
                        **context_data,
                    )
                    # Ensure tool_name is a string before calling handle_tool_call
                    safe_tool_name = str(tool_name) if tool_name is not None else ""
                    tool_result = handle_tool_call(safe_tool_name, tool_input)
                    print(
                        colored_success(
                            f"-> Tool result: {json.dumps(tool_result, indent=2)}"
                        )
                    )
                    logger.info(
                        "Tool call completed",
                        tool_name=tool_name,
                        **context_data,
                    )

                    tool_results.append(
                        {
                            "tool_call": tool_call,
                            "result": tool_result,
                            "tool_name": tool_name,
                            "tool_input": tool_input,
                        }
                    )

    if "output" in response:
        for item in response.get("output", []):
            if (
                isinstance(item, dict)
                and item.get("type") == "function_call"
                and item.get("name")
            ):
                tool_name = item.get("name")
                tool_arguments = item.get("arguments", "")

                print(colored_info("-> Tool calls detected in response"))
                print(colored_info(f"-> Calling tool: {tool_name}"))

                try:
                    tool_input = (
                        json.loads(tool_arguments)
                        if isinstance(tool_arguments, str)
                        else tool_arguments
                    )
                    print(
                        colored_info(
                            f"-> Tool input: {json.dumps(tool_input, indent=2)}"
                        )
                    )

                    logger.info(
                        "Executing tool call",
                        tool_name=tool_name,
                        **context_data,
                    )
                    # Ensure tool_name is a string before calling handle_tool_call
                    safe_tool_name = str(tool_name) if tool_name is not None else ""
                    tool_result = handle_tool_call(safe_tool_name, tool_input)
                    print(
                        colored_success(
                            f"-> Tool result: {json.dumps(tool_result, indent=2)}"
                        )
                    )
                    logger.info(
                        "Tool call completed",
                        tool_name=tool_name,
                        **context_data,
                    )

                    tool_results.append(
                        {
                            "tool_call": {
                                "name": tool_name,
                                "arguments": tool_arguments,
                            },
                            "result": tool_result,
                            "tool_name": tool_name,
                            "tool_input": tool_input,
                        }
                    )
                except json.JSONDecodeError as exc:
                    print(
                        colored_error(f"-> Failed to parse tool arguments: {exc}")
                    )
                    print(colored_error(f"-> Raw arguments: {tool_arguments}"))
                    tool_results.append(
                        {
                            "tool_call": {
                                "name": tool_name,
                                "arguments": tool_arguments,
                            },
                            "result": {
                                "error": f"Failed to parse arguments: {exc}"
                            },
                            "tool_name": tool_name,
                            "tool_input": None,
                        }
                    )

    if "content" in response:
        for content_block in response.get("content", []):
            if (
                isinstance(content_block, dict)
                and content_block.get("type") == "tool_use"
            ):
                print(colored_info("-> Tool calls detected in response"))
                tool_name = content_block.get("name")
                tool_input = content_block.get("input", {})
                print(colored_info(f"-> Calling tool: {tool_name}"))
                print(
                    colored_info(
                        f"-> Tool input: {json.dumps(tool_input, indent=2)}"
                    )
                )

                logger.info(
                    "Executing tool call",
                    tool_name=tool_name,
                    **context_data,
                )
                # Ensure tool_name is a string before calling handle_tool_call
                safe_tool_name = str(tool_name) if tool_name is not None else ""
                tool_result = handle_tool_call(safe_tool_name, tool_input)
                print(
                    colored_success(
                        f"-> Tool result: {json.dumps(tool_result, indent=2)}"
                    )
                )
                logger.info(
                    "Tool call completed",
                    tool_name=tool_name,
                    **context_data,
                )

                tool_results.append(
                    {
                        "tool_call": content_block,
                        "result": tool_result,
                        "tool_name": tool_name,
                        "tool_input": tool_input,
                    }
                )

    if tool_results:
        logger.info(
            "Tool call handling completed",
            tool_count=len(tool_results),
            **context_data,
        )

    return response, tool_results

display_thinking_blocks

display_thinking_blocks(content)

Display thinking blocks from response content.

Source code in ccproxy/testing/endpoints/runner.py
def display_thinking_blocks(self, content: str) -> None:
    """Display thinking blocks from response content."""
    thinking_blocks = extract_thinking_blocks(content)
    if thinking_blocks:
        print(colored_info("-> Thinking blocks detected"))
        for i, (signature, thinking_content) in enumerate(thinking_blocks, 1):
            print(colored_warning(f"[THINKING BLOCK {i}]"))
            print(colored_warning(f"Signature: {signature}"))
            print(colored_warning("=" * 60))
            print(thinking_content.strip())
            print(colored_warning("=" * 60))

display_response_content

display_response_content(response)

Display response content with thinking block handling.

Source code in ccproxy/testing/endpoints/runner.py
def display_response_content(self, response: dict[str, Any]) -> None:
    """Display response content with thinking block handling."""
    content = ""

    if "choices" in response:
        for choice in response.get("choices", []):
            message = choice.get("message", {})
            if message.get("content"):
                content = message["content"]
                break
    elif "content" in response:
        text_parts = []
        for content_block in response.get("content", []):
            if (
                isinstance(content_block, dict)
                and content_block.get("type") == "text"
            ):
                text_parts.append(content_block.get("text", ""))
        content = "".join(text_parts)
    elif "output" in response:
        text_parts = []
        for item in response.get("output", []):
            if not isinstance(item, dict):
                continue
            if item.get("type") == "message":
                for part in item.get("content", []):
                    if isinstance(part, dict) and part.get("type") in {
                        "output_text",
                        "text",
                    }:
                        text_parts.append(part.get("text", ""))
            elif item.get("type") == "reasoning" and item.get("summary"):
                for part in item.get("summary", []):
                    if isinstance(part, dict) and part.get("text"):
                        text_parts.append(part.get("text"))
        content = "\n".join(text_parts)
    elif isinstance(response.get("text"), str):
        content = response.get("text", "")

    if content:
        self.display_thinking_blocks(content)
        visible_content = extract_visible_content(content)
        if visible_content:
            print(colored_info("-> Response content:"))
            print(visible_content)

run_endpoint_test async

run_endpoint_test(test, index)

Run a single endpoint test and return its result.

Source code in ccproxy/testing/endpoints/runner.py
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
async def run_endpoint_test(
    self, test: EndpointTest, index: int
) -> EndpointTestResult:
    """Run a single endpoint test and return its result."""
    request_log: list[EndpointRequestResult] = []

    try:
        full_url = f"{self.base_url}{test.endpoint}"
        provider_key = test.name.split("_", 1)[0]
        payload = get_request_payload(test)

        log_context = {
            "test_name": test.name,
            "endpoint": test.endpoint,
            "model": test.model,
            "stream": test.stream,
        }

        template = REQUEST_DATA[test.request]
        model_class = template.get("model_class")
        chunk_model_class = template.get("chunk_model_class")
        accumulator_class = template.get(
            "accumulator_class"
        ) or PROVIDER_TOOL_ACCUMULATORS.get(provider_key)

        has_tools = "tools" in payload

        logger.info(
            "Running endpoint test",
            test_name=test.name,
            endpoint=test.endpoint,
            stream=test.stream,
            has_tools=has_tools,
            accumulator_class=getattr(accumulator_class, "__name__", None)
            if accumulator_class
            else None,
            model_class=getattr(model_class, "__name__", None)
            if model_class
            else None,
        )

        if has_tools:
            print(colored_info("-> This test includes function tools"))

        if test.stream:
            stream_events, stream_request_results = await self.post_stream(
                full_url,
                payload,
                context={**log_context, "phase": "initial"},
            )
            request_log.extend(stream_request_results)

            (
                full_content,
                finish_reason,
                stream_accumulator,
                processed_events,
            ) = self._consume_stream_events(
                stream_events,
                chunk_model_class,
                accumulator_class,
                context={**log_context, "phase": "initial"},
            )

            if (
                not full_content
                and stream_accumulator
                and getattr(stream_accumulator, "text_content", None)
            ):
                full_content = stream_accumulator.text_content

            if processed_events == 0:
                message = f"{test.name}: streaming response ended without emitting any events"
                print(colored_warning(message))
                logger.warning(
                    "Streaming response empty",
                    event_count=processed_events,
                    **log_context,
                )
                return EndpointTestResult(
                    test=test,
                    index=index,
                    success=False,
                    error=message,
                    request_results=request_log,
                )

            logger.info(
                "Stream events processed",
                event_count=processed_events,
                finish_reason=finish_reason,
                content_preview=(full_content[:120] if full_content else None),
                has_tools=has_tools,
                **log_context,
            )

            if full_content:
                self.display_thinking_blocks(full_content)
                visible_content = extract_visible_content(full_content)
                if visible_content:
                    print(colored_info("-> Accumulated response:"))
                    print(visible_content)

            if stream_accumulator and processed_events > 0:
                aggregated_snapshot = stream_accumulator.rebuild_response_object(
                    {"choices": [], "content": [], "tool_calls": []}
                )
                if any(
                    aggregated_snapshot.get(key)
                    for key in ("choices", "content", "tool_calls", "output")
                ):
                    print(colored_info("-> Aggregated response object (partial):"))
                    print(json.dumps(aggregated_snapshot, indent=2))
                    self.display_response_content(aggregated_snapshot)
                    logger.debug(
                        "Stream accumulator snapshot",
                        snapshot_keys=[
                            key
                            for key, value in aggregated_snapshot.items()
                            if value
                        ],
                        **log_context,
                    )

            tool_results: list[dict[str, Any]] = []
            if has_tools and stream_accumulator:
                complete_tool_calls = stream_accumulator.get_complete_tool_calls()
                if (
                    finish_reason in ["tool_calls", "tool_use"]
                    or complete_tool_calls
                ):
                    tool_defs = (
                        payload.get("tools") if isinstance(payload, dict) else None
                    )
                    tool_results = self._execute_accumulated_tool_calls(
                        complete_tool_calls,
                        tool_defs,
                        context={**log_context, "phase": "tool_execution"},
                    )

                    if tool_results:
                        print(
                            colored_info(
                                "-> Sending tool results back to LLM for final response"
                            )
                        )
                        logger.info(
                            "Tool results ready for continuation",
                            tool_count=len(tool_results),
                            **log_context,
                        )

                        format_type = self._get_format_type_for_test(test)

                        response = {
                            "choices": [{"finish_reason": finish_reason}],
                            "content": full_content,
                        }
                        response["tool_calls"] = complete_tool_calls

                        format_tools = FORMAT_TOOLS[format_type]
                        continuation_payload = (
                            format_tools.build_continuation_request(
                                payload, response, tool_results
                            )
                        )

                        (
                            continuation_events,
                            continuation_request_results,
                        ) = await self.post_stream(
                            full_url,
                            continuation_payload,
                            context={**log_context, "phase": "continuation"},
                        )
                        request_log.extend(continuation_request_results)
                        print(colored_info("Final response (with tool results):"))
                        (
                            continuation_content,
                            _,
                            continuation_accumulator,
                            continuation_events_processed,
                        ) = self._consume_stream_events(
                            continuation_events,
                            chunk_model_class,
                            accumulator_class,
                            context={**log_context, "phase": "continuation"},
                        )

                        if continuation_events_processed == 0:
                            message = f"{test.name}: continuation streaming response contained no events"
                            print(colored_warning(message))
                            logger.warning(
                                "Continuation response empty",
                                event_count=continuation_events_processed,
                                **log_context,
                            )
                            return EndpointTestResult(
                                test=test,
                                index=index,
                                success=False,
                                error=message,
                                request_results=request_log,
                            )

                        logger.info(
                            "Continuation stream processed",
                            event_count=continuation_events_processed,
                            content_preview=(
                                continuation_content[:120]
                                if continuation_content
                                else None
                            ),
                            **log_context,
                        )

                        if continuation_content:
                            self.display_thinking_blocks(continuation_content)
                            visible_content = extract_visible_content(
                                continuation_content
                            )
                            if visible_content:
                                print(colored_info("-> Accumulated response:"))
                                print(visible_content)

                        if (
                            continuation_accumulator
                            and continuation_events_processed > 0
                        ):
                            aggregated_snapshot = (
                                continuation_accumulator.rebuild_response_object(
                                    {"choices": [], "content": [], "tool_calls": []}
                                )
                            )
                            if any(
                                aggregated_snapshot.get(key)
                                for key in ("choices", "content", "tool_calls")
                            ):
                                print(
                                    colored_info(
                                        "-> Aggregated response object (partial):"
                                    )
                                )
                                print(json.dumps(aggregated_snapshot, indent=2))
                                self.display_response_content(aggregated_snapshot)
                                logger.debug(
                                    "Continuation accumulator snapshot",
                                    snapshot_keys=[
                                        key
                                        for key, value in aggregated_snapshot.items()
                                        if value
                                    ],
                                    **log_context,
                                )

        else:
            response, initial_request_result = await self.post_json(
                full_url,
                payload,
                context={**log_context, "phase": "initial"},
                capture_result=True,
            )
            request_log.append(initial_request_result)

            print(json.dumps(response, indent=2))

            json_tool_results: list[dict[str, Any]] = []
            if has_tools:
                response, json_tool_results = self.handle_tool_calls_in_response(
                    response, context={**log_context, "phase": "tool_detection"}
                )

                if json_tool_results:
                    print(
                        colored_info(
                            "-> Sending tool results back to LLM for final response"
                        )
                    )
                    logger.info(
                        "Tool results ready for continuation",
                        tool_count=len(json_tool_results),
                        **log_context,
                    )

                    format_type = self._get_format_type_for_test(test)
                    format_tools = FORMAT_TOOLS[format_type]
                    continuation_payload = format_tools.build_continuation_request(
                        payload, response, json_tool_results
                    )

                    (
                        continuation_response,
                        continuation_request_result,
                    ) = await self.post_json(
                        full_url,
                        continuation_payload,
                        context={**log_context, "phase": "continuation"},
                        capture_result=True,
                    )
                    request_log.append(continuation_request_result)
                    print(colored_info("Final response (with tool results):"))
                    print(json.dumps(continuation_response, indent=2))
                    self.display_response_content(continuation_response)
                    preview_data = json.dumps(
                        continuation_response, ensure_ascii=False
                    )
                    logger.info(
                        "Continuation response received",
                        tool_count=len(json_tool_results),
                        content_preview=preview_data[:120],
                        **log_context,
                    )

            self.display_response_content(response)

            if "error" not in response and model_class:
                self.validate_response(response, model_class, is_streaming=False)

        print(colored_success(f"[OK] Test {test.name} completed successfully"))
        logger.info("Test completed successfully", **log_context)
        return EndpointTestResult(
            test=test,
            index=index,
            success=True,
            request_results=request_log,
        )

    except Exception as exc:  # noqa: BLE001
        print(colored_error(f"[FAIL] Test {test.name} failed: {exc}"))
        logger.error(
            "Test execution failed",
            **log_context,
            error=str(exc),
            exc_info=exc,
        )
        return EndpointTestResult(
            test=test,
            index=index,
            success=False,
            error=str(exc),
            exception=exc,
            request_results=request_log,
        )

validate_stream_chunk

validate_stream_chunk(chunk, chunk_model_class)

Validate a streaming chunk against the provided model class.

Source code in ccproxy/testing/endpoints/runner.py
def validate_stream_chunk(
    self, chunk: dict[str, Any], chunk_model_class: Any
) -> bool:
    """Validate a streaming chunk against the provided model class."""

    # Some providers emit housekeeping chunks (e.g. pure filter results) that
    # do not include the standard fields expected by the OpenAI schema. Skip
    # validation for those so we only flag real contract violations.
    if not chunk.get("choices") and "model" not in chunk:
        logger.debug(
            "Skipping validation for non-standard chunk",
            chunk_keys=list(chunk.keys()),
        )
        return True

    if chunk.get("type") == "message" and "choices" not in chunk:
        logger.debug(
            "Skipping validation for provider message chunk",
            chunk_type=chunk.get("type"),
            chunk_keys=list(chunk.keys()),
        )
        return True

    try:
        chunk_model_class.model_validate(chunk)
        return True
    except Exception as exc:  # noqa: BLE001
        if self._has_tool_calls_in_chunk(chunk):
            logger.debug(
                "Validation failed for tool call chunk (expected)", error=str(exc)
            )
            return True

        print(
            colored_error(
                f"[ERROR] {chunk_model_class.__name__} chunk validation failed: {exc}"
            )
        )
        return False

run_all_tests async

run_all_tests(selected_indices=None)

Run endpoint tests, optionally filtered by selected indices.

Source code in ccproxy/testing/endpoints/runner.py
async def run_all_tests(
    self, selected_indices: list[int] | None = None
) -> EndpointTestRunSummary:
    """Run endpoint tests, optionally filtered by selected indices."""
    print(colored_header("CCProxy Endpoint Tests"))
    print(colored_info(f"Test endpoints at {self.base_url}"))
    logger.info("Starting endpoint tests", base_url=self.base_url)

    total_available = len(ENDPOINT_TESTS)

    if selected_indices is not None:
        indices_to_run = [i for i in selected_indices if 0 <= i < total_available]
        logger.info(
            "Running selected tests",
            selected_count=len(indices_to_run),
            total_count=total_available,
            selected_indices=selected_indices,
        )
    else:
        indices_to_run = list(range(total_available))
        logger.info("Running all tests", test_count=total_available)

    total_to_run = len(indices_to_run)
    print(
        colored_info(
            f"Selected tests: {total_to_run} of {total_available} available"
        )
    )

    if total_to_run == 0:
        print(colored_warning("No tests selected; nothing to execute."))
        logger.warning("No tests selected for execution")
        return EndpointTestRunSummary(
            base_url=self.base_url,
            results=[],
            successful_count=0,
            failure_count=0,
        )

    results: list[EndpointTestResult] = []
    successful_tests = 0
    failed_tests = 0

    for position, index in enumerate(indices_to_run, 1):
        test = ENDPOINT_TESTS[index]

        progress_message = (
            f"[{position}/{total_to_run}] Running test #{index + 1}: {test.name}"
        )
        if test.description and test.description != test.name:
            progress_message += f" - {test.description}"

        print(colored_progress(progress_message))
        logger.info(
            "Dispatching endpoint test",
            test_name=test.name,
            endpoint=test.endpoint,
            ordinal=position,
            total=total_to_run,
            stream=test.stream,
            model=test.model,
        )

        result = await self.run_endpoint_test(test, index)
        results.append(result)

        if result.success:
            successful_tests += 1
        else:
            failed_tests += 1

    error_messages = [result.error for result in results if result.error]

    summary = EndpointTestRunSummary(
        base_url=self.base_url,
        results=results,
        successful_count=successful_tests,
        failure_count=failed_tests,
        errors=error_messages,
    )

    if summary.failure_count == 0:
        print(
            colored_success(
                f"\nAll {summary.total} endpoint tests completed successfully."
            )
        )
        logger.info(
            "All endpoint tests completed successfully",
            total_tests=summary.total,
            successful=summary.successful_count,
            failed=summary.failure_count,
            error_count=len(summary.errors),
        )
    else:
        print(
            colored_warning(
                f"\nTest run completed: {summary.successful_count} passed, "
                f"{summary.failure_count} failed (out of {summary.total})."
            )
        )
        logger.warning(
            "Endpoint tests completed with failures",
            total_tests=summary.total,
            successful=summary.successful_count,
            failed=summary.failure_count,
            errors=summary.errors,
            error_count=len(summary.errors),
        )

        if summary.failed_results:
            print(colored_error("Failed tests:"))
            for failed in summary.failed_results:
                error_detail = failed.error or "no error message provided"
                print(
                    colored_error(
                        f"  - {failed.test.name} (#{failed.index + 1}): {error_detail}"
                    )
                )

        additional_errors = [err for err in summary.errors if err]
        if additional_errors and len(additional_errors) > summary.failure_count:
            print(colored_error("Additional errors:"))
            for err in additional_errors:
                print(colored_error(f"  - {err}"))

    return summary

cli_main

cli_main(argv=None)

Run the CLI test harness.

Source code in ccproxy/testing/endpoints/cli.py
def main(argv: list[str] | None = None) -> None:
    """Run the CLI test harness."""
    parser = argparse.ArgumentParser(
        description=(
            "Test CCProxy endpoints with response validation, function tools, "
            "thinking mode, and structured output support"
        ),
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog=f"""
{list_available_tests()}

Test selection examples:
  --tests 1                           # Run test 1 only
  --tests 1,3,5                       # Run tests 1, 3, and 5
  --tests 1..3                        # Run tests 1 through 3
  --tests 4..                         # Run tests 4 through end
  --tests ..3                         # Run tests 1 through 3
  --tests 1,4..6,8                    # Run test 1, tests 4-6, and test 8
  --tests copilot_chat_completions    # Run test by exact name
  --tests copilot                     # Run all tests containing "copilot"
  --tests "copilot_.*_stream"         # Run all copilot streaming tests (regex)
  --tests ".*_stream"                 # Run all streaming tests (regex)
  --tests "claude_.*"                 # Run all claude tests (regex)
  --tests 1,copilot_.*_stream,codex   # Mix indices, regex, and partial names

Feature-specific test patterns:
  --tests ".*_tools.*"                # Run all function tool tests
  --tests ".*_thinking.*"             # Run all thinking mode tests
  --tests ".*_structured.*"           # Run all structured output tests
  --tools                             # Add function tools to compatible tests
  --thinking                          # Use thinking-capable models where available
  --structured                        # Enable structured output formatting
""",
    )
    parser.add_argument(
        "--base",
        default="http://127.0.0.1:8000",
        help="Base URL for the API server (default: http://127.0.0.1:8000)",
    )
    parser.add_argument(
        "--tests",
        help=(
            "Select tests by index, name, regex pattern, or ranges (e.g., "
            "1,2,3 or copilot_.*_stream or 1..3)"
        ),
    )
    parser.add_argument(
        "--list",
        action="store_true",
        help="List available tests and exit (don't run any tests)",
    )
    parser.add_argument(
        "--tools",
        action="store_true",
        help="Add function tools to compatible test requests (weather, distance, calculate)",
    )
    parser.add_argument(
        "--thinking",
        action="store_true",
        help="Enable thinking mode for OpenAI requests (uses o1-preview/o1-mini models)",
    )
    parser.add_argument(
        "--structured",
        action="store_true",
        help="Enable structured output mode for detailed response analysis",
    )
    parser.add_argument(
        "--show-tools",
        action="store_true",
        help="Display available function tools and exit",
    )
    parser.add_argument(
        "-v",
        action="store_true",
        help="Set log level to INFO",
    )
    parser.add_argument(
        "-vv",
        action="store_true",
        help="Set log level to DEBUG",
    )
    parser.add_argument(
        "-vvv",
        action="store_true",
        help="Set log level to DEBUG (same as -vv)",
    )
    parser.add_argument(
        "--log-level",
        choices=["warn", "info", "debug", "error"],
        default="warn",
        help="Set log level explicitly (default: warn)",
    )

    args = parser.parse_args(argv)

    log_level = args.log_level
    if args.v:
        log_level = "info"
    elif args.vv or args.vvv:
        log_level = "debug"

    setup_logging(log_level)

    if args.show_tools:
        print(colored_header("Available Function Tools"))
        for tool in create_openai_tools():
            func = tool["function"]
            print(f"\n{colored_info('Tool:')} {func['name']}")
            print(f"{colored_info('Description:')} {func['description']}")
            print(
                f"{colored_info('Parameters:')} {json.dumps(func['parameters'], indent=2)}"
            )
        sys.exit(0)

    if args.list:
        print(list_available_tests())
        if args.tools or args.thinking or args.structured:
            print(colored_header("Available Feature Flags"))
            if args.tools:
                print(colored_info("Function tools will be added to compatible tests"))
            if args.thinking:
                print(colored_info("Thinking mode will be enabled for OpenAI tests"))
            if args.structured:
                print(colored_info("Structured output mode will be enabled"))
        sys.exit(0)

    if args.tools or args.thinking or args.structured:
        print(colored_header("Global Feature Flags Applied"))
        if args.tools:
            print(colored_info("→ Function tools enabled for compatible tests"))
        if args.thinking:
            print(colored_info("→ Thinking mode enabled for OpenAI tests"))
        if args.structured:
            print(colored_info("→ Structured output mode enabled"))

    try:
        summary = asyncio.run(
            run_endpoint_tests_async(base_url=args.base, tests=args.tests)
        )
    except ValueError as exc:
        structlog.get_logger(__name__).error(
            "Invalid test selection format", selection=args.tests, error=str(exc)
        )
        sys.exit(1)
    except KeyboardInterrupt:
        structlog.get_logger(__name__).info("Tests interrupted by user")
        sys.exit(1)
    except Exception as exc:  # noqa: BLE001
        structlog.get_logger(__name__).error(
            "Test execution failed", error=str(exc), exc_info=exc
        )
        sys.exit(1)

    if summary.failure_count:
        sys.exit(1)

setup_logging

setup_logging(level='warn')

Setup structured logging with specified level.

Source code in ccproxy/testing/endpoints/cli.py
def setup_logging(level: str = "warn") -> None:
    """Setup structured logging with specified level."""
    log_level_map = {
        "warn": logging.WARNING,
        "info": logging.INFO,
        "debug": logging.DEBUG,
        "error": logging.ERROR,
    }

    logging.basicConfig(
        level=log_level_map.get(level, logging.WARNING),
        format="%(message)s",
    )

    structlog.configure(
        processors=[
            structlog.stdlib.filter_by_level,
            structlog.stdlib.add_logger_name,
            structlog.stdlib.add_log_level,
            structlog.stdlib.PositionalArgumentsFormatter(),
            structlog.processors.TimeStamper(fmt="iso"),
            structlog.processors.StackInfoRenderer(),
            structlog.processors.CallsiteParameterAdder(
                parameters=[
                    structlog.processors.CallsiteParameter.FILENAME,
                    structlog.processors.CallsiteParameter.LINENO,
                ]
            ),
            structlog.processors.format_exc_info,
            structlog.processors.UnicodeDecoder(),
            structlog.dev.ConsoleRenderer(colors=True),
        ],
        context_class=dict,
        logger_factory=structlog.stdlib.LoggerFactory(),
        wrapper_class=structlog.stdlib.BoundLogger,
        cache_logger_on_first_use=True,
    )

list_available_tests

list_available_tests()

Generate a formatted list of available tests for help text.

Source code in ccproxy/testing/endpoints/config.py
def list_available_tests() -> str:
    """Generate a formatted list of available tests for help text."""

    lines = ["Available tests:"]
    for i, test in enumerate(ENDPOINT_TESTS, 1):
        lines.append(f"  {i:2d}. {test.name:<30} - {test.description}")
    return "\n".join(lines)

resolve_selected_indices

resolve_selected_indices(selection)

Normalize test selection input into 0-based indices.

Source code in ccproxy/testing/endpoints/runner.py
def resolve_selected_indices(
    selection: str | Sequence[int] | None,
) -> list[int] | None:
    """Normalize test selection input into 0-based indices."""

    if selection is None:
        return None

    total_tests = len(ENDPOINT_TESTS)

    if isinstance(selection, str):
        indices = parse_test_selection(selection, total_tests)
    else:
        try:
            seen: set[int] = set()
            indices = []
            for raw in selection:
                index = int(raw)
                if index in seen:
                    continue
                seen.add(index)
                indices.append(index)
        except TypeError as exc:
            raise TypeError(
                "tests must be a selection string or a sequence of integers"
            ) from exc

        indices.sort()

    for index in indices:
        if index < 0 or index >= total_tests:
            raise ValueError(
                f"Test index {index} is out of range (0-{total_tests - 1})"
            )

    return indices

run_endpoint_tests

run_endpoint_tests(
    base_url="http://127.0.0.1:8000", tests=None
)

Convenience wrapper to run endpoint tests from synchronous code.

Source code in ccproxy/testing/endpoints/runner.py
def run_endpoint_tests(
    base_url: str = "http://127.0.0.1:8000",
    tests: str | Sequence[int] | None = None,
) -> EndpointTestRunSummary:
    """Convenience wrapper to run endpoint tests from synchronous code."""

    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        loop = None

    if loop and loop.is_running():
        raise RuntimeError(
            "run_endpoint_tests() cannot be called while an event loop is running; "
            "use await run_endpoint_tests_async(...) instead"
        )

    return asyncio.run(run_endpoint_tests_async(base_url=base_url, tests=tests))

run_endpoint_tests_async async

run_endpoint_tests_async(
    base_url="http://127.0.0.1:8000", tests=None
)

Execute endpoint tests asynchronously and return the summary.

Source code in ccproxy/testing/endpoints/runner.py
async def run_endpoint_tests_async(
    base_url: str = "http://127.0.0.1:8000",
    tests: str | Sequence[int] | None = None,
) -> EndpointTestRunSummary:
    """Execute endpoint tests asynchronously and return the summary."""

    selected_indices = resolve_selected_indices(tests)
    if selected_indices is not None and not selected_indices:
        raise ValueError("No valid tests selected")

    async with TestEndpoint(base_url=base_url) as tester:
        return await tester.run_all_tests(selected_indices)