Skip to content

ccproxy.utils

ccproxy.utils

Utility modules for shared functionality across the application.

calculate_cost_breakdown

calculate_cost_breakdown(
    tokens_input,
    tokens_output,
    model,
    cache_read_tokens=None,
    cache_write_tokens=None,
)

Calculate detailed cost breakdown for the given token usage.

Parameters:

Name Type Description Default
tokens_input int | None

Number of input tokens

required
tokens_output int | None

Number of output tokens

required
model str | None

Model name for pricing lookup

required
cache_read_tokens int | None

Number of cache read tokens

None
cache_write_tokens int | None

Number of cache write tokens

None

Returns:

Type Description
dict[str, float | str] | None

Dictionary with cost breakdown or None if calculation not possible

Source code in ccproxy/utils/cost_calculator.py
def calculate_cost_breakdown(
    tokens_input: int | None,
    tokens_output: int | None,
    model: str | None,
    cache_read_tokens: int | None = None,
    cache_write_tokens: int | None = None,
) -> dict[str, float | str] | None:
    """Calculate detailed cost breakdown for the given token usage.

    Args:
        tokens_input: Number of input tokens
        tokens_output: Number of output tokens
        model: Model name for pricing lookup
        cache_read_tokens: Number of cache read tokens
        cache_write_tokens: Number of cache write tokens

    Returns:
        Dictionary with cost breakdown or None if calculation not possible
    """
    if not model or (
        not tokens_input
        and not tokens_output
        and not cache_read_tokens
        and not cache_write_tokens
    ):
        return None

    try:
        # Import pricing system components
        from ccproxy.config.pricing import PricingSettings
        from ccproxy.pricing.cache import PricingCache
        from ccproxy.pricing.loader import PricingLoader

        # Get canonical model name
        canonical_model = PricingLoader.get_canonical_model_name(model)

        # Create pricing components with dependency injection
        settings = PricingSettings()
        cache = PricingCache(settings)
        cached_data = cache.load_cached_data()

        # If cache is expired, try to use stale cache as fallback
        if not cached_data:
            try:
                import json

                if cache.cache_file.exists():
                    with cache.cache_file.open(encoding="utf-8") as f:
                        cached_data = json.load(f)
                    logger.debug(
                        "cost_breakdown_using_stale_cache",
                        cache_age_hours=cache.get_cache_info().get("age_hours"),
                    )
            except (OSError, json.JSONDecodeError):
                pass

        if not cached_data:
            return None

        # Load pricing data
        pricing_data = PricingLoader.load_pricing_from_data(cached_data, verbose=False)
        if not pricing_data or canonical_model not in pricing_data:
            return None

        model_pricing = pricing_data[canonical_model]

        # Calculate individual costs (pricing is per 1M tokens)
        input_cost = ((tokens_input or 0) / 1_000_000) * float(model_pricing.input)
        output_cost = ((tokens_output or 0) / 1_000_000) * float(model_pricing.output)
        cache_read_cost = ((cache_read_tokens or 0) / 1_000_000) * float(
            model_pricing.cache_read
        )
        cache_write_cost = ((cache_write_tokens or 0) / 1_000_000) * float(
            model_pricing.cache_write
        )

        total_cost = input_cost + output_cost + cache_read_cost + cache_write_cost

        return {
            "input_cost": input_cost,
            "output_cost": output_cost,
            "cache_read_cost": cache_read_cost,
            "cache_write_cost": cache_write_cost,
            "total_cost": total_cost,
            "model": canonical_model,
        }

    except Exception as e:
        logger.debug("cost_breakdown_error", error=str(e), model=model)
        return None

calculate_token_cost

calculate_token_cost(
    tokens_input,
    tokens_output,
    model,
    cache_read_tokens=None,
    cache_write_tokens=None,
)

Calculate cost in USD for the given token usage including cache tokens.

This is a shared utility function that provides consistent cost calculation across all services using the pricing data from the pricing system.

Parameters:

Name Type Description Default
tokens_input int | None

Number of input tokens

required
tokens_output int | None

Number of output tokens

required
model str | None

Model name for pricing lookup

required
cache_read_tokens int | None

Number of cache read tokens

None
cache_write_tokens int | None

Number of cache write tokens

None

Returns:

Type Description
float | None

Cost in USD or None if calculation not possible

Source code in ccproxy/utils/cost_calculator.py
def calculate_token_cost(
    tokens_input: int | None,
    tokens_output: int | None,
    model: str | None,
    cache_read_tokens: int | None = None,
    cache_write_tokens: int | None = None,
) -> float | None:
    """Calculate cost in USD for the given token usage including cache tokens.

    This is a shared utility function that provides consistent cost calculation
    across all services using the pricing data from the pricing system.

    Args:
        tokens_input: Number of input tokens
        tokens_output: Number of output tokens
        model: Model name for pricing lookup
        cache_read_tokens: Number of cache read tokens
        cache_write_tokens: Number of cache write tokens

    Returns:
        Cost in USD or None if calculation not possible
    """
    if not model or (
        not tokens_input
        and not tokens_output
        and not cache_read_tokens
        and not cache_write_tokens
    ):
        return None

    try:
        # Import pricing system components
        from ccproxy.config.pricing import PricingSettings
        from ccproxy.pricing.cache import PricingCache
        from ccproxy.pricing.loader import PricingLoader

        # Get canonical model name
        canonical_model = PricingLoader.get_canonical_model_name(model)

        # Create pricing components with dependency injection
        settings = PricingSettings()
        cache = PricingCache(settings)
        cached_data = cache.load_cached_data()

        # If cache is expired, try to use stale cache as fallback
        if not cached_data:
            try:
                import json

                if cache.cache_file.exists():
                    with cache.cache_file.open(encoding="utf-8") as f:
                        cached_data = json.load(f)
                    logger.debug(
                        "cost_calculation_using_stale_cache",
                        cache_age_hours=cache.get_cache_info().get("age_hours"),
                    )
            except (OSError, json.JSONDecodeError):
                pass

        if not cached_data:
            logger.debug("cost_calculation_skipped", reason="no_pricing_data")
            return None

        # Load pricing data
        pricing_data = PricingLoader.load_pricing_from_data(cached_data, verbose=False)
        if not pricing_data or canonical_model not in pricing_data:
            logger.debug(
                "cost_calculation_skipped",
                model=canonical_model,
                reason="model_not_found",
            )
            return None

        model_pricing = pricing_data[canonical_model]

        # Calculate cost (pricing is per 1M tokens)
        input_cost = ((tokens_input or 0) / 1_000_000) * float(model_pricing.input)
        output_cost = ((tokens_output or 0) / 1_000_000) * float(model_pricing.output)
        cache_read_cost = ((cache_read_tokens or 0) / 1_000_000) * float(
            model_pricing.cache_read
        )
        cache_write_cost = ((cache_write_tokens or 0) / 1_000_000) * float(
            model_pricing.cache_write
        )

        total_cost = input_cost + output_cost + cache_read_cost + cache_write_cost

        logger.debug(
            "cost_calculated",
            model=canonical_model,
            tokens_input=tokens_input,
            tokens_output=tokens_output,
            cache_read_tokens=cache_read_tokens,
            cache_write_tokens=cache_write_tokens,
            input_cost=input_cost,
            output_cost=output_cost,
            cache_read_cost=cache_read_cost,
            cache_write_cost=cache_write_cost,
            cost_usd=total_cost,
        )

        return total_cost

    except Exception as e:
        logger.debug("cost_calculation_error", error=str(e), model=model)
        return None

monitor_disconnection async

monitor_disconnection(request, session_id, claude_service)

Monitor for client disconnection and interrupt session if detected.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request

required
session_id str

The Claude SDK session ID to interrupt if disconnected

required
claude_service ClaudeSDKService

The Claude SDK service instance

required
Source code in ccproxy/utils/disconnection_monitor.py
async def monitor_disconnection(
    request: Request, session_id: str, claude_service: "ClaudeSDKService"
) -> None:
    """Monitor for client disconnection and interrupt session if detected.

    Args:
        request: The incoming HTTP request
        session_id: The Claude SDK session ID to interrupt if disconnected
        claude_service: The Claude SDK service instance
    """
    try:
        while True:
            await asyncio.sleep(1.0)  # Check every second
            if await request.is_disconnected():
                logger.info(
                    "client_disconnected_interrupting_session", session_id=session_id
                )
                try:
                    await claude_service.sdk_client.interrupt_session(session_id)
                except Exception as e:
                    logger.error(
                        "failed_to_interrupt_session",
                        session_id=session_id,
                        error=str(e),
                    )
                return
    except asyncio.CancelledError:
        # Task was cancelled, which is expected when streaming completes normally
        logger.debug("disconnection_monitor_cancelled", session_id=session_id)
        raise

monitor_stuck_stream async

monitor_stuck_stream(
    session_id,
    claude_service,
    first_chunk_event,
    timeout=10.0,
)

Monitor for stuck streams that don't produce a first chunk (SystemMessage).

Parameters:

Name Type Description Default
session_id str

The Claude SDK session ID to monitor

required
claude_service ClaudeSDKService

The Claude SDK service instance

required
first_chunk_event Event

Event that will be set when first chunk is received

required
timeout float

Seconds to wait for first chunk before considering stream stuck

10.0
Source code in ccproxy/utils/disconnection_monitor.py
async def monitor_stuck_stream(
    session_id: str,
    claude_service: "ClaudeSDKService",
    first_chunk_event: asyncio.Event,
    timeout: float = 10.0,
) -> None:
    """Monitor for stuck streams that don't produce a first chunk (SystemMessage).

    Args:
        session_id: The Claude SDK session ID to monitor
        claude_service: The Claude SDK service instance
        first_chunk_event: Event that will be set when first chunk is received
        timeout: Seconds to wait for first chunk before considering stream stuck
    """
    try:
        # Wait for first chunk with timeout
        await asyncio.wait_for(first_chunk_event.wait(), timeout=timeout)
        logger.debug("stuck_stream_first_chunk_received", session_id=session_id)
    except TimeoutError:
        logger.error(
            "streaming_system_message_timeout",
            session_id=session_id,
            timeout=timeout,
            message=f"No SystemMessage received within {timeout}s, interrupting session",
        )
        try:
            await claude_service.sdk_client.interrupt_session(session_id)
            logger.info("stuck_session_interrupted_successfully", session_id=session_id)
        except Exception as e:
            logger.error(
                "failed_to_interrupt_stuck_session", session_id=session_id, error=str(e)
            )
    except asyncio.CancelledError:
        # Task was cancelled, which is expected when streaming completes normally
        logger.debug("stuck_stream_monitor_cancelled", session_id=session_id)
        raise

generate_client_id

generate_client_id()

Generate a consistent client ID for SDK connections.

Returns:

Name Type Description
str str

First part of a UUID4 (8 characters)

Source code in ccproxy/utils/id_generator.py
def generate_client_id() -> str:
    """Generate a consistent client ID for SDK connections.

    Returns:
        str: First part of a UUID4 (8 characters)
    """
    return str(uuid.uuid4()).split("-")[0]