Skip to content

ccproxy.testing.scenarios

ccproxy.testing.scenarios

Scenario generation and traffic pattern utilities.

ScenarioGenerator

ScenarioGenerator(config)

Generate request scenarios based on traffic configuration.

Source code in ccproxy/testing/scenarios.py
def __init__(self, config: TrafficConfig):
    self.config = config

generate_scenarios

generate_scenarios()

Generate request scenarios based on configuration.

Source code in ccproxy/testing/scenarios.py
def generate_scenarios(self) -> list[RequestScenario]:
    """Generate request scenarios based on configuration."""
    total_requests = int(
        self.config.duration_seconds * self.config.requests_per_second
    )
    scenarios = []

    # Calculate timeframe
    start_time = self.config.start_timestamp or datetime.now(UTC)
    time_span = self.config.duration_seconds

    for i in range(total_requests):
        # Determine timing based on pattern
        time_offset = self._calculate_time_offset(i, total_requests, time_span)
        request_time = start_time + time_offset

        # Select random parameters
        model = random.choice(self.config.models)
        message_type = random.choice(self.config.message_types)
        streaming = random.random() < self.config.streaming_probability

        # Determine response type
        response_type = self._determine_response_type()

        # Determine API format based on distribution
        api_format = self._determine_api_format()

        # Set endpoint path based on format
        endpoint_path = (
            "/api/v1/chat/completions"
            if api_format == "openai"
            else "/api/v1/messages"
        )

        # Generate headers with bypass and format-specific headers
        headers = self._generate_headers(api_format, streaming)

        scenarios.append(
            RequestScenario(
                model=model,
                message_type=message_type,
                streaming=streaming,
                response_type=response_type,
                timestamp=request_time,
                api_format=api_format,
                endpoint_path=endpoint_path,
                bypass_upstream=self.config.bypass_mode,
                use_real_auth=not self.config.bypass_mode,
                headers=headers,
                target_url=self.config.target_url,
            )
        )

    return scenarios

TrafficPatternAnalyzer

Analyze and validate traffic patterns.

analyze_distribution staticmethod

analyze_distribution(scenarios)

Analyze the distribution of scenarios.

Source code in ccproxy/testing/scenarios.py
@staticmethod
def analyze_distribution(scenarios: list[RequestScenario]) -> dict[str, Any]:
    """Analyze the distribution of scenarios."""
    analysis = {
        "total_scenarios": len(scenarios),
        "api_format_distribution": {},
        "model_distribution": {},
        "message_type_distribution": {},
        "streaming_percentage": 0.0,
        "time_span_seconds": 0.0,
    }

    if not scenarios:
        return analysis

    # Count distributions
    api_formats: dict[str, int] = {}
    models: dict[str, int] = {}
    message_types: dict[str, int] = {}
    streaming_count = 0

    for scenario in scenarios:
        # API format distribution
        api_formats[scenario.api_format] = (
            api_formats.get(scenario.api_format, 0) + 1
        )

        # Model distribution
        models[scenario.model] = models.get(scenario.model, 0) + 1

        # Message type distribution
        message_types[scenario.message_type] = (
            message_types.get(scenario.message_type, 0) + 1
        )

        # Streaming count
        if scenario.streaming:
            streaming_count += 1

    # Calculate percentages
    total = len(scenarios)
    analysis["api_format_distribution"] = {
        k: v / total for k, v in api_formats.items()
    }
    analysis["model_distribution"] = {k: v / total for k, v in models.items()}
    analysis["message_type_distribution"] = {
        k: v / total for k, v in message_types.items()
    }
    analysis["streaming_percentage"] = streaming_count / total

    # Calculate time span
    timestamps = [scenario.timestamp for scenario in scenarios]
    if timestamps:
        analysis["time_span_seconds"] = (
            max(timestamps) - min(timestamps)
        ).total_seconds()

    return analysis