Skip to content

ccproxy.plugins.claude_api.hooks

ccproxy.plugins.claude_api.hooks

Claude API plugin hooks for streaming metrics extraction.

ClaudeAPIStreamingMetricsHook

ClaudeAPIStreamingMetricsHook(
    pricing_service=None, plugin_registry=None
)

Bases: Hook

Hook to extract and accumulate metrics from Claude API streaming responses.

Parameters:

Name Type Description Default
pricing_service Any

Direct pricing service instance (if available at init)

None
plugin_registry Any

Plugin registry to get pricing service lazily

None
Source code in ccproxy/plugins/claude_api/hooks.py
def __init__(
    self, pricing_service: Any = None, plugin_registry: Any = None
) -> None:
    """Initialize with optional pricing service for cost calculation.

    Args:
        pricing_service: Direct pricing service instance (if available at init)
        plugin_registry: Plugin registry to get pricing service lazily
    """
    self.pricing_service = pricing_service
    self.plugin_registry = plugin_registry
    # Store metrics per request_id
    self._metrics_cache: dict[str, dict[str, Any]] = {}
    # Incremental SSE parsers keyed by request
    self._sse_parsers: dict[str, SSEStreamParser] = {}