Skip to content

ccproxy.testing

ccproxy.testing

Testing utilities and mock response generation for CCProxy.

This package provides comprehensive testing utilities including: - Mock response generation for bypass mode - Request payload builders for dual-format testing - Response processing and metrics collection - Traffic pattern generation and scenario management

MockResponseConfig

Bases: BaseModel

Configuration for realistic mock responses.

RequestScenario

Bases: BaseModel

Individual request scenario configuration.

TrafficConfig

Bases: BaseModel

Configuration for traffic generation scenarios.

TrafficMetrics

Bases: BaseModel

Enhanced metrics for dual-format testing.

MessageContentGenerator

MessageContentGenerator()

Generate realistic message content for testing.

Source code in ccproxy/testing/content_generation.py
def __init__(self) -> None:
    self.response_templates = self._load_response_templates()
    self.request_templates = self._load_request_templates()

get_request_message_content

get_request_message_content(message_type)

Get request message content based on type.

Source code in ccproxy/testing/content_generation.py
def get_request_message_content(self, message_type: str) -> str:
    """Get request message content based on type."""
    if message_type in self.request_templates:
        return random.choice(self.request_templates[message_type])
    else:
        # Fallback to short message for unknown types
        return random.choice(self.request_templates["short"])

get_response_content

get_response_content(message_type, model)

Generate response content with realistic token counts.

Source code in ccproxy/testing/content_generation.py
def get_response_content(
    self, message_type: str, model: str
) -> tuple[str, int, int]:
    """Generate response content with realistic token counts."""
    # Select base template
    if message_type == "tool_use":
        base_content = random.choice(self.response_templates["tool_use"])
        # Add calculation result
        result = random.randint(1, 1000)
        content = f"{base_content} The result is {result}."
    elif message_type in self.response_templates:
        content = random.choice(self.response_templates[message_type])
    else:
        # Mix of different lengths for unknown types
        template_type = random.choice(["short", "medium", "long"])
        content = random.choice(self.response_templates[template_type])

    # Calculate realistic token counts based on content
    # Rough estimate: ~4 characters per token
    estimated_output_tokens = max(1, len(content) // 4)

    # Add some randomness but keep it realistic
    output_tokens = random.randint(
        max(1, estimated_output_tokens - 10), estimated_output_tokens + 20
    )

    # Input tokens based on typical request sizes (10-500 range)
    input_tokens = random.randint(10, 500)

    return content, input_tokens, output_tokens

PayloadBuilder

PayloadBuilder()

Build request payloads for different API formats.

Source code in ccproxy/testing/content_generation.py
def __init__(self) -> None:
    self.content_generator = MessageContentGenerator()

build_anthropic_payload

build_anthropic_payload(scenario)

Build Anthropic format payload.

Source code in ccproxy/testing/content_generation.py
def build_anthropic_payload(self, scenario: RequestScenario) -> dict[str, Any]:
    """Build Anthropic format payload."""
    payload = {
        "model": scenario.model,
        "messages": [
            {
                "role": "user",
                "content": self.content_generator.get_request_message_content(
                    scenario.message_type
                ),
            }
        ],
        "stream": scenario.streaming,
        "max_tokens": random.randint(100, 4000),  # Realistic token limits
    }

    if scenario.message_type == "tool_use":
        payload["tools"] = [
            {
                "name": "calculator",
                "description": "Perform basic calculations",
                "input_schema": {
                    "type": "object",
                    "properties": {
                        "expression": {
                            "type": "string",
                            "description": "Math expression to evaluate",
                        }
                    },
                    "required": ["expression"],
                },
            }
        ]

    return payload

build_openai_payload

build_openai_payload(scenario)

Build OpenAI format payload.

Source code in ccproxy/testing/content_generation.py
def build_openai_payload(self, scenario: RequestScenario) -> dict[str, Any]:
    """Build OpenAI format payload."""
    messages = [
        {
            "role": "user",
            "content": self.content_generator.get_request_message_content(
                scenario.message_type
            ),
        }
    ]

    payload = {
        "model": scenario.model,
        "messages": messages,
        "stream": scenario.streaming,
        "max_tokens": random.randint(100, 4000),  # Realistic token limits
    }

    if scenario.message_type == "tool_use":
        payload["tools"] = [
            {
                "type": "function",
                "function": {
                    "name": "calculator",
                    "description": "Perform basic calculations",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "expression": {
                                "type": "string",
                                "description": "Math expression to evaluate",
                            }
                        },
                        "required": ["expression"],
                    },
                },
            }
        ]

    return payload

build_payload

build_payload(scenario)

Build request payload based on scenario format.

Source code in ccproxy/testing/content_generation.py
def build_payload(self, scenario: RequestScenario) -> dict[str, Any]:
    """Build request payload based on scenario format."""
    # Use custom payload if provided
    if scenario.custom_payload:
        return scenario.custom_payload

    # Build format-specific payload
    if scenario.api_format == "openai":
        return self.build_openai_payload(scenario)
    else:
        return self.build_anthropic_payload(scenario)

RealisticMockResponseGenerator

RealisticMockResponseGenerator(config=None)

Generate realistic mock responses with proper randomization.

Source code in ccproxy/testing/mock_responses.py
def __init__(self, config: MockResponseConfig | None = None):
    self.config = config or MockResponseConfig()
    self.content_generator: MessageContentGenerator = MessageContentGenerator()

generate_response_content

generate_response_content(message_type, model)

Generate response content with realistic token counts.

Source code in ccproxy/testing/mock_responses.py
def generate_response_content(
    self, message_type: str, model: str
) -> tuple[str, int, int]:
    """Generate response content with realistic token counts."""
    return self.content_generator.get_response_content(message_type, model)

generate_cache_tokens

generate_cache_tokens()

Generate realistic cache token counts.

Source code in ccproxy/testing/mock_responses.py
def generate_cache_tokens(self) -> tuple[int, int]:
    """Generate realistic cache token counts."""
    if random.random() < self.config.cache_token_probability:
        cache_read = random.randint(*self.config.cache_read_range)
        cache_write = random.randint(*self.config.cache_write_range)
        return cache_read, cache_write
    return 0, 0

should_simulate_error

should_simulate_error()

Determine if this response should be an error.

Source code in ccproxy/testing/mock_responses.py
def should_simulate_error(self) -> bool:
    """Determine if this response should be an error."""
    return (
        self.config.simulate_errors
        and random.random() < self.config.error_probability
    )

generate_error_response

generate_error_response(api_format)

Generate realistic error response.

Source code in ccproxy/testing/mock_responses.py
def generate_error_response(self, api_format: str) -> tuple[dict[str, Any], int]:
    """Generate realistic error response."""
    error_types = [
        {
            "type": "rate_limit_error",
            "message": "Rate limit exceeded. Please try again later.",
            "status_code": 429,
        },
        {
            "type": "invalid_request_error",
            "message": "Invalid request format.",
            "status_code": 400,
        },
        {
            "type": "overloaded_error",
            "message": "Service temporarily overloaded.",
            "status_code": 503,
        },
    ]

    error = random.choice(error_types)
    status_code: int = error["status_code"]  # type: ignore[assignment]

    if api_format == "openai":
        return {
            "error": {
                "message": error["message"],
                "type": error["type"],
                "code": error["type"],
            }
        }, status_code
    else:
        return {
            "type": "error",
            "error": {"type": error["type"], "message": error["message"]},
        }, status_code

generate_realistic_anthropic_stream

generate_realistic_anthropic_stream(
    request_id,
    model,
    content,
    input_tokens,
    output_tokens,
    cache_read_tokens,
    cache_write_tokens,
)

Generate realistic Anthropic streaming chunks.

Source code in ccproxy/testing/mock_responses.py
def generate_realistic_anthropic_stream(
    self,
    request_id: str,
    model: str,
    content: str,
    input_tokens: int,
    output_tokens: int,
    cache_read_tokens: int,
    cache_write_tokens: int,
) -> list[dict[str, Any]]:
    """Generate realistic Anthropic streaming chunks."""

    chunks = []

    # Message start
    chunks.append(
        {
            "type": "message_start",
            "message": {
                "id": request_id,
                "type": "message",
                "role": "assistant",
                "content": [],
                "model": model,
                "stop_reason": None,
                "stop_sequence": None,
                "usage": {"input_tokens": input_tokens, "output_tokens": 0},
            },
        }
    )

    # Content block start
    chunk_start: dict[str, Any] = {
        "type": "content_block_start",
        "index": 0,
        "content_block": {"type": "text", "text": ""},
    }
    chunks.append(chunk_start)

    # Split content into realistic chunks (by words)
    words = content.split()
    chunk_sizes = []

    # Generate realistic chunk sizes
    i = 0
    while i < len(words):
        # Random chunk size between 1-5 words
        chunk_size = random.randint(1, min(5, len(words) - i))
        chunk_sizes.append(chunk_size)
        i += chunk_size

    # Generate content deltas
    word_index = 0
    for chunk_size in chunk_sizes:
        chunk_words = words[word_index : word_index + chunk_size]
        chunk_text = (
            " " + " ".join(chunk_words) if word_index > 0 else " ".join(chunk_words)
        )

        chunk_delta: dict[str, Any] = {
            "type": "content_block_delta",
            "index": 0,
            "delta": {"type": "text_delta", "text": chunk_text},
        }
        chunks.append(chunk_delta)
        word_index += chunk_size

    # Content block stop
    chunk_stop: dict[str, Any] = {"type": "content_block_stop", "index": 0}
    chunks.append(chunk_stop)

    # Message delta with final usage
    chunks.append(
        {
            "type": "message_delta",
            "delta": {"stop_reason": "end_turn", "stop_sequence": None},
            "usage": {
                "output_tokens": output_tokens,
                "cache_creation_input_tokens": cache_write_tokens,
                "cache_read_input_tokens": cache_read_tokens,
            },
        }
    )

    # Message stop
    chunks.append({"type": "message_stop"})

    return chunks

generate_realistic_openai_stream

generate_realistic_openai_stream(
    request_id, model, content, input_tokens, output_tokens
)

Generate realistic OpenAI streaming chunks by converting Anthropic format.

Source code in ccproxy/testing/mock_responses.py
def generate_realistic_openai_stream(
    self,
    request_id: str,
    model: str,
    content: str,
    input_tokens: int,
    output_tokens: int,
) -> list[dict[str, Any]]:
    """Generate realistic OpenAI streaming chunks by converting Anthropic format."""

    # Generate Anthropic chunks first
    anthropic_chunks = self.generate_realistic_anthropic_stream(
        request_id, model, content, input_tokens, output_tokens, 0, 0
    )

    # Convert to OpenAI format
    openai_chunks = []
    for chunk in anthropic_chunks:
        # Use simplified conversion logic
        if chunk.get("type") == "message_start":
            openai_chunks.append(
                {
                    "id": f"chatcmpl-{request_id}",
                    "object": "chat.completion.chunk",
                    "created": int(time.time()),
                    "model": model,
                    "choices": [
                        {
                            "index": 0,
                            "delta": {"role": "assistant", "content": ""},
                            "finish_reason": None,
                        }
                    ],
                }
            )
        elif chunk.get("type") == "content_block_delta":
            delta_text = chunk.get("delta", {}).get("text", "")
            openai_chunks.append(
                {
                    "id": f"chatcmpl-{request_id}",
                    "object": "chat.completion.chunk",
                    "created": int(time.time()),
                    "model": model,
                    "choices": [
                        {
                            "index": 0,
                            "delta": {"content": delta_text},
                            "finish_reason": None,
                        }
                    ],
                }
            )
        elif chunk.get("type") == "message_stop":
            openai_chunks.append(
                {
                    "id": f"chatcmpl-{request_id}",
                    "object": "chat.completion.chunk",
                    "created": int(time.time()),
                    "model": model,
                    "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
                }
            )

    return openai_chunks

calculate_realistic_cost

calculate_realistic_cost(
    input_tokens,
    output_tokens,
    model,
    cache_read_tokens,
    cache_write_tokens,
)

Calculate realistic cost based on current Claude pricing.

Source code in ccproxy/testing/mock_responses.py
def calculate_realistic_cost(
    self,
    input_tokens: int,
    output_tokens: int,
    model: str,
    cache_read_tokens: int,
    cache_write_tokens: int,
) -> float:
    """Calculate realistic cost based on current Claude pricing."""

    # Simplified pricing (should use actual cost calculator)
    if "sonnet" in model.lower():
        input_cost_per_token = 0.000003  # $3 per million tokens
        output_cost_per_token = 0.000015  # $15 per million tokens
    elif "haiku" in model.lower():
        input_cost_per_token = 0.00000025  # $0.25 per million tokens
        output_cost_per_token = 0.00000125  # $1.25 per million tokens
    else:
        input_cost_per_token = 0.000003
        output_cost_per_token = 0.000015

    base_cost = (
        input_tokens * input_cost_per_token + output_tokens * output_cost_per_token
    )

    # Cache costs (typically lower)
    cache_cost = (
        cache_read_tokens * input_cost_per_token * 0.1  # 10% of input cost
        + cache_write_tokens * input_cost_per_token * 0.5  # 50% of input cost
    )

    return round(base_cost + cache_cost, 6)

MetricsExtractor

Extract metrics from API responses.

extract_token_metrics staticmethod

extract_token_metrics(response_data, api_format)

Extract token usage from response data.

Source code in ccproxy/testing/response_handlers.py
@staticmethod
def extract_token_metrics(
    response_data: dict[str, Any], api_format: str
) -> dict[str, int | None]:
    """Extract token usage from response data."""
    if api_format == "openai":
        usage = response_data.get("usage", {})
        return {
            "input_tokens": usage.get("prompt_tokens"),
            "output_tokens": usage.get("completion_tokens"),
            "cache_read_tokens": None,  # OpenAI doesn't expose cache metrics
            "cache_write_tokens": None,
        }
    else:  # anthropic
        usage = response_data.get("usage", {})
        return {
            "input_tokens": usage.get("input_tokens"),
            "output_tokens": usage.get("output_tokens"),
            "cache_read_tokens": usage.get("cache_read_input_tokens"),
            "cache_write_tokens": usage.get("cache_creation_input_tokens"),
        }

extract_content staticmethod

extract_content(response_data, api_format)

Extract text content from response data.

Source code in ccproxy/testing/response_handlers.py
@staticmethod
def extract_content(response_data: dict[str, Any], api_format: str) -> str:
    """Extract text content from response data."""
    if api_format == "openai":
        content = (
            response_data.get("choices", [{}])[0]
            .get("message", {})
            .get("content", "")
        )
        return content if isinstance(content, str) else ""
    else:  # anthropic
        content = ""
        for block in response_data.get("content", []):
            if block.get("type") == "text":
                text = block.get("text", "")
                content += text if isinstance(text, str) else ""
        return content

ResponseHandler

Handle responses from both Anthropic and OpenAI formats.

process_response

process_response(response, scenario)

Process response based on format and streaming.

Source code in ccproxy/testing/response_handlers.py
def process_response(
    self, response: httpx.Response, scenario: RequestScenario
) -> dict[str, Any]:
    """Process response based on format and streaming."""

    if scenario.streaming:
        return self._process_streaming_response(response, scenario)
    else:
        return self._process_standard_response(response, scenario)

ScenarioGenerator

ScenarioGenerator(config)

Generate request scenarios based on traffic configuration.

Source code in ccproxy/testing/scenarios.py
def __init__(self, config: TrafficConfig):
    self.config = config

generate_scenarios

generate_scenarios()

Generate request scenarios based on configuration.

Source code in ccproxy/testing/scenarios.py
def generate_scenarios(self) -> list[RequestScenario]:
    """Generate request scenarios based on configuration."""
    total_requests = int(
        self.config.duration_seconds * self.config.requests_per_second
    )
    scenarios = []

    # Calculate timeframe
    start_time = self.config.start_timestamp or datetime.now(UTC)
    time_span = self.config.duration_seconds

    for i in range(total_requests):
        # Determine timing based on pattern
        time_offset = self._calculate_time_offset(i, total_requests, time_span)
        request_time = start_time + time_offset

        # Select random parameters
        model = random.choice(self.config.models)
        message_type = random.choice(self.config.message_types)
        streaming = random.random() < self.config.streaming_probability

        # Determine response type
        response_type = self._determine_response_type()

        # Determine API format based on distribution
        api_format = self._determine_api_format()

        # Set endpoint path based on format
        endpoint_path = (
            "/api/v1/chat/completions"
            if api_format == "openai"
            else "/api/v1/messages"
        )

        # Generate headers with bypass and format-specific headers
        headers = self._generate_headers(api_format, streaming)

        scenarios.append(
            RequestScenario(
                model=model,
                message_type=message_type,
                streaming=streaming,
                response_type=response_type,
                timestamp=request_time,
                api_format=api_format,
                endpoint_path=endpoint_path,
                bypass_upstream=self.config.bypass_mode,
                use_real_auth=not self.config.bypass_mode,
                headers=headers,
                target_url=self.config.target_url,
            )
        )

    return scenarios

TrafficPatternAnalyzer

Analyze and validate traffic patterns.

analyze_distribution staticmethod

analyze_distribution(scenarios)

Analyze the distribution of scenarios.

Source code in ccproxy/testing/scenarios.py
@staticmethod
def analyze_distribution(scenarios: list[RequestScenario]) -> dict[str, Any]:
    """Analyze the distribution of scenarios."""
    analysis = {
        "total_scenarios": len(scenarios),
        "api_format_distribution": {},
        "model_distribution": {},
        "message_type_distribution": {},
        "streaming_percentage": 0.0,
        "time_span_seconds": 0.0,
    }

    if not scenarios:
        return analysis

    # Count distributions
    api_formats: dict[str, int] = {}
    models: dict[str, int] = {}
    message_types: dict[str, int] = {}
    streaming_count = 0

    for scenario in scenarios:
        # API format distribution
        api_formats[scenario.api_format] = (
            api_formats.get(scenario.api_format, 0) + 1
        )

        # Model distribution
        models[scenario.model] = models.get(scenario.model, 0) + 1

        # Message type distribution
        message_types[scenario.message_type] = (
            message_types.get(scenario.message_type, 0) + 1
        )

        # Streaming count
        if scenario.streaming:
            streaming_count += 1

    # Calculate percentages
    total = len(scenarios)
    analysis["api_format_distribution"] = {
        k: v / total for k, v in api_formats.items()
    }
    analysis["model_distribution"] = {k: v / total for k, v in models.items()}
    analysis["message_type_distribution"] = {
        k: v / total for k, v in message_types.items()
    }
    analysis["streaming_percentage"] = streaming_count / total

    # Calculate time span
    timestamps = [scenario.timestamp for scenario in scenarios]
    if timestamps:
        analysis["time_span_seconds"] = (
            max(timestamps) - min(timestamps)
        ).total_seconds()

    return analysis