Skip to content

ccproxy.observability

ccproxy.observability

Observability module for the CCProxy API.

This module provides comprehensive observability capabilities including metrics collection, structured logging, request context tracking, and observability pipeline management.

The observability system follows a hybrid architecture that combines: - Real-time metrics collection and aggregation - Structured logging with correlation IDs - Request context propagation across service boundaries - Pluggable pipeline for metrics export and alerting

Components: - metrics: Core metrics collection, aggregation, and export functionality - logging: Structured logging configuration and context-aware loggers - context: Request context tracking and correlation across async operations - pipeline: Observability data pipeline for metrics export and alerting

RequestContext dataclass

RequestContext(
    request_id,
    start_time,
    logger,
    metadata=dict(),
    storage=None,
)

Context object for tracking request state and metadata.

Provides access to request ID, timing information, and structured logger with automatically injected context.

duration_ms property

duration_ms

Get current duration in milliseconds.

duration_seconds property

duration_seconds

Get current duration in seconds.

add_metadata

add_metadata(**kwargs)

Add metadata to the request context.

Source code in ccproxy/observability/context.py
def add_metadata(self, **kwargs: Any) -> None:
    """Add metadata to the request context."""
    self.metadata.update(kwargs)
    # Update logger context
    self.logger = self.logger.bind(**kwargs)

log_event

log_event(event, **kwargs)

Log an event with current context and timing.

Source code in ccproxy/observability/context.py
def log_event(self, event: str, **kwargs: Any) -> None:
    """Log an event with current context and timing."""
    self.logger.info(
        event, request_id=self.request_id, duration_ms=self.duration_ms, **kwargs
    )

PrometheusMetrics

PrometheusMetrics(
    namespace="ccproxy",
    registry=None,
    pushgateway_client=None,
)

Prometheus metrics collector for operational monitoring.

Provides thread-safe, high-performance metrics collection using prometheus_client. Designed for minimal overhead in request processing hot paths.

Parameters:

Name Type Description Default
namespace str

Metric name prefix

'ccproxy'
registry CollectorRegistry | None

Custom Prometheus registry (uses default if None)

None
pushgateway_client Any | None

Optional pushgateway client for dependency injection

None
Source code in ccproxy/observability/metrics.py
def __init__(
    self,
    namespace: str = "ccproxy",
    registry: CollectorRegistry | None = None,
    pushgateway_client: Any | None = None,
):
    """
    Initialize Prometheus metrics.

    Args:
        namespace: Metric name prefix
        registry: Custom Prometheus registry (uses default if None)
        pushgateway_client: Optional pushgateway client for dependency injection
    """
    if not PROMETHEUS_AVAILABLE:
        logger.warning(
            "prometheus_client not available. Metrics will be disabled. "
            "Install with: pip install prometheus-client"
        )

    self.namespace = namespace
    # Use default registry if None is passed
    if registry is None and PROMETHEUS_AVAILABLE:
        from prometheus_client import REGISTRY

        self.registry: CollectorRegistry | None = REGISTRY
    else:
        self.registry = registry
    self._enabled = PROMETHEUS_AVAILABLE
    self._pushgateway_client = pushgateway_client

    if self._enabled:
        self._init_metrics()
        # Initialize pushgateway client if not provided via DI
        if self._pushgateway_client is None:
            self._init_pushgateway()

record_request

record_request(
    method,
    endpoint,
    model=None,
    status="unknown",
    service_type=None,
)

Record a request event.

Parameters:

Name Type Description Default
method str

HTTP method (GET, POST, etc.)

required
endpoint str

API endpoint path

required
model str | None

Model name used

None
status str | int

Response status code or status string

'unknown'
service_type str | None

Service type (claude_sdk_service, proxy_service)

None
Source code in ccproxy/observability/metrics.py
def record_request(
    self,
    method: str,
    endpoint: str,
    model: str | None = None,
    status: str | int = "unknown",
    service_type: str | None = None,
) -> None:
    """
    Record a request event.

    Args:
        method: HTTP method (GET, POST, etc.)
        endpoint: API endpoint path
        model: Model name used
        status: Response status code or status string
        service_type: Service type (claude_sdk_service, proxy_service)
    """
    if not self._enabled:
        return

    self.request_counter.labels(
        method=method,
        endpoint=endpoint,
        model=model or "unknown",
        status=str(status),
        service_type=service_type or "unknown",
    ).inc()

record_response_time

record_response_time(
    duration_seconds,
    model=None,
    endpoint="unknown",
    service_type=None,
)

Record response time.

Parameters:

Name Type Description Default
duration_seconds float

Response time in seconds

required
model str | None

Model name used

None
endpoint str

API endpoint

'unknown'
service_type str | None

Service type (claude_sdk_service, proxy_service)

None
Source code in ccproxy/observability/metrics.py
def record_response_time(
    self,
    duration_seconds: float,
    model: str | None = None,
    endpoint: str = "unknown",
    service_type: str | None = None,
) -> None:
    """
    Record response time.

    Args:
        duration_seconds: Response time in seconds
        model: Model name used
        endpoint: API endpoint
        service_type: Service type (claude_sdk_service, proxy_service)
    """
    if not self._enabled:
        return

    self.response_time.labels(
        model=model or "unknown",
        endpoint=endpoint,
        service_type=service_type or "unknown",
    ).observe(duration_seconds)

record_tokens

record_tokens(
    token_count, token_type, model=None, service_type=None
)

Record token usage.

Parameters:

Name Type Description Default
token_count int

Number of tokens

required
token_type str

Type of tokens (input, output, cache_read, cache_write)

required
model str | None

Model name

None
service_type str | None

Service type (claude_sdk_service, proxy_service)

None
Source code in ccproxy/observability/metrics.py
def record_tokens(
    self,
    token_count: int,
    token_type: str,
    model: str | None = None,
    service_type: str | None = None,
) -> None:
    """
    Record token usage.

    Args:
        token_count: Number of tokens
        token_type: Type of tokens (input, output, cache_read, cache_write)
        model: Model name
        service_type: Service type (claude_sdk_service, proxy_service)
    """
    if not self._enabled or token_count <= 0:
        return

    self.token_counter.labels(
        type=token_type,
        model=model or "unknown",
        service_type=service_type or "unknown",
    ).inc(token_count)

record_cost

record_cost(
    cost_usd,
    model=None,
    cost_type="total",
    service_type=None,
)

Record cost.

Parameters:

Name Type Description Default
cost_usd float

Cost in USD

required
model str | None

Model name

None
cost_type str

Type of cost (input, output, cache, total)

'total'
service_type str | None

Service type (claude_sdk_service, proxy_service)

None
Source code in ccproxy/observability/metrics.py
def record_cost(
    self,
    cost_usd: float,
    model: str | None = None,
    cost_type: str = "total",
    service_type: str | None = None,
) -> None:
    """
    Record cost.

    Args:
        cost_usd: Cost in USD
        model: Model name
        cost_type: Type of cost (input, output, cache, total)
        service_type: Service type (claude_sdk_service, proxy_service)
    """
    if not self._enabled or cost_usd <= 0:
        return

    self.cost_counter.labels(
        model=model or "unknown",
        cost_type=cost_type,
        service_type=service_type or "unknown",
    ).inc(cost_usd)

record_error

record_error(
    error_type,
    endpoint="unknown",
    model=None,
    service_type=None,
)

Record an error event.

Parameters:

Name Type Description Default
error_type str

Type/name of error

required
endpoint str

API endpoint where error occurred

'unknown'
model str | None

Model name if applicable

None
service_type str | None

Service type (claude_sdk_service, proxy_service)

None
Source code in ccproxy/observability/metrics.py
def record_error(
    self,
    error_type: str,
    endpoint: str = "unknown",
    model: str | None = None,
    service_type: str | None = None,
) -> None:
    """
    Record an error event.

    Args:
        error_type: Type/name of error
        endpoint: API endpoint where error occurred
        model: Model name if applicable
        service_type: Service type (claude_sdk_service, proxy_service)
    """
    if not self._enabled:
        return

    self.error_counter.labels(
        error_type=error_type,
        endpoint=endpoint,
        model=model or "unknown",
        service_type=service_type or "unknown",
    ).inc()

set_active_requests

set_active_requests(count)

Set the current number of active requests.

Parameters:

Name Type Description Default
count int

Number of active requests

required
Source code in ccproxy/observability/metrics.py
def set_active_requests(self, count: int) -> None:
    """
    Set the current number of active requests.

    Args:
        count: Number of active requests
    """
    if not self._enabled:
        return

    self.active_requests.set(count)

inc_active_requests

inc_active_requests()

Increment active request counter.

Source code in ccproxy/observability/metrics.py
def inc_active_requests(self) -> None:
    """Increment active request counter."""
    if not self._enabled:
        return

    self.active_requests.inc()

dec_active_requests

dec_active_requests()

Decrement active request counter.

Source code in ccproxy/observability/metrics.py
def dec_active_requests(self) -> None:
    """Decrement active request counter."""
    if not self._enabled:
        return

    self.active_requests.dec()

update_system_info

update_system_info(info)

Update system information.

Parameters:

Name Type Description Default
info dict[str, str]

Dictionary of system information key-value pairs

required
Source code in ccproxy/observability/metrics.py
def update_system_info(self, info: dict[str, str]) -> None:
    """
    Update system information.

    Args:
        info: Dictionary of system information key-value pairs
    """
    if not self._enabled:
        return

    self.system_info.info(info)

is_enabled

is_enabled()

Check if metrics collection is enabled.

Source code in ccproxy/observability/metrics.py
def is_enabled(self) -> bool:
    """Check if metrics collection is enabled."""
    return self._enabled

push_to_gateway

push_to_gateway(method='push')

Push current metrics to Pushgateway using official prometheus_client methods.

Parameters:

Name Type Description Default
method str

Push method - "push" (replace), "pushadd" (add), or "delete"

'push'

Returns:

Type Description
bool

True if push succeeded, False otherwise

Source code in ccproxy/observability/metrics.py
def push_to_gateway(self, method: str = "push") -> bool:
    """
    Push current metrics to Pushgateway using official prometheus_client methods.

    Args:
        method: Push method - "push" (replace), "pushadd" (add), or "delete"

    Returns:
        True if push succeeded, False otherwise
    """

    if not self._enabled or not self._pushgateway_client:
        return False

    result = self._pushgateway_client.push_metrics(self.registry, method)
    return bool(result)

push_add_to_gateway

push_add_to_gateway()

Add current metrics to existing job/instance in Pushgateway (pushadd operation).

This is useful when you want to add metrics without replacing existing ones.

Returns:

Type Description
bool

True if push succeeded, False otherwise

Source code in ccproxy/observability/metrics.py
def push_add_to_gateway(self) -> bool:
    """
    Add current metrics to existing job/instance in Pushgateway (pushadd operation).

    This is useful when you want to add metrics without replacing existing ones.

    Returns:
        True if push succeeded, False otherwise
    """
    return self.push_to_gateway(method="pushadd")

delete_from_gateway

delete_from_gateway()

Delete all metrics for the configured job from Pushgateway.

This removes all metrics associated with the job, useful for cleanup.

Returns:

Type Description
bool

True if delete succeeded, False otherwise

Source code in ccproxy/observability/metrics.py
def delete_from_gateway(self) -> bool:
    """
    Delete all metrics for the configured job from Pushgateway.

    This removes all metrics associated with the job, useful for cleanup.

    Returns:
        True if delete succeeded, False otherwise
    """

    if not self._enabled or not self._pushgateway_client:
        return False

    result = self._pushgateway_client.delete_metrics()
    return bool(result)

is_pushgateway_enabled

is_pushgateway_enabled()

Check if Pushgateway client is enabled and configured.

Source code in ccproxy/observability/metrics.py
def is_pushgateway_enabled(self) -> bool:
    """Check if Pushgateway client is enabled and configured."""
    return (
        self._pushgateway_client is not None
        and self._pushgateway_client.is_enabled()
    )

PushgatewayClient

PushgatewayClient(settings)

Prometheus Pushgateway client using official prometheus_client methods.

Supports standard pushgateway operations: - push_to_gateway(): Replace all metrics for job/instance - pushadd_to_gateway(): Add metrics to existing job/instance - delete_from_gateway(): Delete metrics for job/instance

Also supports VictoriaMetrics remote write protocol for compatibility.

Parameters:

Name Type Description Default
settings ObservabilitySettings

Observability configuration settings

required
Source code in ccproxy/observability/pushgateway.py
def __init__(self, settings: ObservabilitySettings) -> None:
    """Initialize Pushgateway client.

    Args:
        settings: Observability configuration settings
    """
    self.settings = settings
    # Pushgateway is enabled if URL is configured and prometheus_client is available
    self._enabled = PROMETHEUS_AVAILABLE and bool(settings.pushgateway_url)
    self._circuit_breaker = CircuitBreaker(
        failure_threshold=5,
        recovery_timeout=60.0,
    )

    # Only log if pushgateway URL is configured but prometheus is not available
    if settings.pushgateway_url and not PROMETHEUS_AVAILABLE:
        logger.warning(
            "prometheus_client not available. Pushgateway will be disabled. "
            "Install with: pip install prometheus-client"
        )

push_metrics

push_metrics(registry, method='push')

Push metrics to Pushgateway using official prometheus_client methods.

Parameters:

Name Type Description Default
registry CollectorRegistry

Prometheus metrics registry to push

required
method str

Push method - "push" (replace), "pushadd" (add), or "delete"

'push'

Returns:

Type Description
bool

True if push succeeded, False otherwise

Source code in ccproxy/observability/pushgateway.py
def push_metrics(self, registry: CollectorRegistry, method: str = "push") -> bool:
    """Push metrics to Pushgateway using official prometheus_client methods.

    Args:
        registry: Prometheus metrics registry to push
        method: Push method - "push" (replace), "pushadd" (add), or "delete"

    Returns:
        True if push succeeded, False otherwise
    """

    if not self._enabled or not self.settings.pushgateway_url:
        return False

    # Check circuit breaker before attempting operation
    if not self._circuit_breaker.can_execute():
        logger.debug(
            "pushgateway_circuit_breaker_blocking",
            state=self._circuit_breaker.state,
            failure_count=self._circuit_breaker.failure_count,
        )
        return False

    try:
        # Check if URL looks like VictoriaMetrics remote write endpoint
        if "/api/v1/write" in self.settings.pushgateway_url:
            success = self._push_remote_write(registry)
        else:
            success = self._push_standard(registry, method)

        if success:
            self._circuit_breaker.record_success()
        else:
            self._circuit_breaker.record_failure()

        return success

    except Exception as e:
        self._circuit_breaker.record_failure()
        logger.error(
            "pushgateway_push_failed",
            url=self.settings.pushgateway_url,
            job=self.settings.pushgateway_job,
            method=method,
            error=str(e),
            error_type=type(e).__name__,
        )
        return False

push_add_metrics

push_add_metrics(registry)

Add metrics to existing job/instance (pushadd operation).

Parameters:

Name Type Description Default
registry CollectorRegistry

Prometheus metrics registry to add

required

Returns:

Type Description
bool

True if push succeeded, False otherwise

Source code in ccproxy/observability/pushgateway.py
def push_add_metrics(self, registry: CollectorRegistry) -> bool:
    """Add metrics to existing job/instance (pushadd operation).

    Args:
        registry: Prometheus metrics registry to add

    Returns:
        True if push succeeded, False otherwise
    """
    return self.push_metrics(registry, method="pushadd")

delete_metrics

delete_metrics()

Delete all metrics for the configured job.

Returns:

Type Description
bool

True if delete succeeded, False otherwise

Source code in ccproxy/observability/pushgateway.py
def delete_metrics(self) -> bool:
    """Delete all metrics for the configured job.

    Returns:
        True if delete succeeded, False otherwise
    """

    if not self._enabled or not self.settings.pushgateway_url:
        return False

    # Check circuit breaker before attempting operation
    if not self._circuit_breaker.can_execute():
        logger.debug(
            "pushgateway_circuit_breaker_blocking_delete",
            state=self._circuit_breaker.state,
            failure_count=self._circuit_breaker.failure_count,
        )
        return False

    try:
        # Only standard pushgateway supports delete operation
        if "/api/v1/write" in self.settings.pushgateway_url:
            logger.warning("pushgateway_delete_not_supported_for_remote_write")
            return False
        else:
            success = self._push_standard(None, method="delete")  # type: ignore[arg-type]

            if success:
                self._circuit_breaker.record_success()
            else:
                self._circuit_breaker.record_failure()

            return success

    except Exception as e:
        self._circuit_breaker.record_failure()
        logger.error(
            "pushgateway_delete_failed",
            url=self.settings.pushgateway_url,
            job=self.settings.pushgateway_job,
            error=str(e),
            error_type=type(e).__name__,
        )
        return False

is_enabled

is_enabled()

Check if Pushgateway client is enabled and configured.

Source code in ccproxy/observability/pushgateway.py
def is_enabled(self) -> bool:
    """Check if Pushgateway client is enabled and configured."""
    return self._enabled and bool(self.settings.pushgateway_url)

get_context_tracker

get_context_tracker()

Get or create global context tracker.

Source code in ccproxy/observability/context.py
def get_context_tracker() -> ContextTracker:
    """Get or create global context tracker."""
    global _global_tracker

    if _global_tracker is None:
        _global_tracker = ContextTracker()

    return _global_tracker

request_context async

request_context(
    request_id=None,
    storage=None,
    metrics=None,
    **initial_context,
)

Context manager for tracking complete request lifecycle with timing.

Automatically logs request start/success/error events with accurate timing. Provides structured logging with request correlation.

Parameters:

Name Type Description Default
request_id str | None

Unique request identifier (generated if not provided)

None
storage Any | None

Optional storage backend for access logs

None
metrics Any | None

Optional PrometheusMetrics instance for active request tracking

None
**initial_context Any

Initial context to include in all log events

{}

Yields:

Name Type Description
RequestContext AsyncGenerator[RequestContext, None]

Context object with timing and logging capabilities

Example

async with request_context(method="POST", path="/v1/messages") as ctx: ctx.add_metadata(model="claude-3-5-sonnet") # Process request ctx.log_event("request_processed", tokens=150) # Context automatically logs success with timing

Source code in ccproxy/observability/context.py
@asynccontextmanager
async def request_context(
    request_id: str | None = None,
    storage: Any | None = None,
    metrics: Any | None = None,
    **initial_context: Any,
) -> AsyncGenerator[RequestContext, None]:
    """
    Context manager for tracking complete request lifecycle with timing.

    Automatically logs request start/success/error events with accurate timing.
    Provides structured logging with request correlation.

    Args:
        request_id: Unique request identifier (generated if not provided)
        storage: Optional storage backend for access logs
        metrics: Optional PrometheusMetrics instance for active request tracking
        **initial_context: Initial context to include in all log events

    Yields:
        RequestContext: Context object with timing and logging capabilities

    Example:
        async with request_context(method="POST", path="/v1/messages") as ctx:
            ctx.add_metadata(model="claude-3-5-sonnet")
            # Process request
            ctx.log_event("request_processed", tokens=150)
            # Context automatically logs success with timing
    """
    if request_id is None:
        request_id = str(uuid.uuid4())

    # Create logger with bound context
    request_logger = logger.bind(request_id=request_id, **initial_context)

    # Record start time
    start_time = time.perf_counter()

    # Log request start
    request_logger.debug(
        "request_start", request_id=request_id, timestamp=time.time(), **initial_context
    )

    # Emit SSE event for real-time dashboard updates
    await _emit_request_start_event(request_id, initial_context)

    # Increment active requests if metrics provided
    if metrics:
        metrics.inc_active_requests()

    # Create context object
    ctx = RequestContext(
        request_id=request_id,
        start_time=start_time,
        logger=request_logger,
        metadata=dict(initial_context),
        storage=storage,
    )

    try:
        yield ctx

        # Log successful completion with comprehensive access log
        duration_ms = ctx.duration_ms

        # Use the new unified access logger for comprehensive logging
        from ccproxy.observability.access_logger import log_request_access

        await log_request_access(
            context=ctx,
            # Extract client info from metadata if available
            client_ip=ctx.metadata.get("client_ip"),
            user_agent=ctx.metadata.get("user_agent"),
            query=ctx.metadata.get("query"),
            storage=ctx.storage,  # Pass storage from context
        )

        # Also keep the original request_success event for debugging
        request_logger.debug(
            "request_success",
            request_id=request_id,
            duration_ms=duration_ms,
            duration_seconds=ctx.duration_seconds,
            **ctx.metadata,
        )

    except Exception as e:
        # Log error with timing
        duration_ms = ctx.duration_ms
        error_type = type(e).__name__

        request_logger.error(
            "request_error",
            request_id=request_id,
            duration_ms=duration_ms,
            duration_seconds=ctx.duration_seconds,
            error_type=error_type,
            error_message=str(e),
            **ctx.metadata,
        )

        # Emit SSE event for real-time dashboard updates
        await _emit_request_error_event(request_id, error_type, str(e), ctx.metadata)

        # Re-raise the exception
        raise
    finally:
        # Decrement active requests if metrics provided
        if metrics:
            metrics.dec_active_requests()

timed_operation async

timed_operation(operation_name, request_id=None, **context)

Context manager for timing individual operations within a request.

Useful for measuring specific parts of request processing like API calls, database queries, or data processing steps.

Parameters:

Name Type Description Default
operation_name str

Name of the operation being timed

required
request_id str | None

Associated request ID for correlation

None
**context Any

Additional context for logging

{}

Yields:

Type Description
AsyncGenerator[dict[str, Any], None]

Dict with timing information and logger

Example

async with timed_operation("claude_api_call", request_id=ctx.request_id) as op: response = await api_client.call() op["response_size"] = len(response) # Automatically logs operation timing

Source code in ccproxy/observability/context.py
@asynccontextmanager
async def timed_operation(
    operation_name: str, request_id: str | None = None, **context: Any
) -> AsyncGenerator[dict[str, Any], None]:
    """
    Context manager for timing individual operations within a request.

    Useful for measuring specific parts of request processing like
    API calls, database queries, or data processing steps.

    Args:
        operation_name: Name of the operation being timed
        request_id: Associated request ID for correlation
        **context: Additional context for logging

    Yields:
        Dict with timing information and logger

    Example:
        async with timed_operation("claude_api_call", request_id=ctx.request_id) as op:
            response = await api_client.call()
            op["response_size"] = len(response)
            # Automatically logs operation timing
    """
    start_time = time.perf_counter()
    operation_id = str(uuid.uuid4())

    # Create operation logger
    op_logger = logger.bind(
        operation_name=operation_name,
        operation_id=operation_id,
        request_id=request_id,
        **context,
    )

    # Log operation start (only for important operations)
    if operation_name in ("claude_api_call", "request_processing", "auth_check"):
        op_logger.debug(
            "operation_start",
            operation_name=operation_name,
            **context,
        )

    # Operation context
    op_context = {
        "operation_id": operation_id,
        "logger": op_logger,
        "start_time": start_time,
    }

    try:
        yield op_context

        # Log successful completion (only for important operations)
        duration_ms = (time.perf_counter() - start_time) * 1000
        if operation_name in ("claude_api_call", "request_processing", "auth_check"):
            op_logger.info(
                "operation_success",
                operation_name=operation_name,
                duration_ms=duration_ms,
                **{
                    k: v
                    for k, v in op_context.items()
                    if k not in ("logger", "start_time")
                },
            )

    except Exception as e:
        # Log operation error
        duration_ms = (time.perf_counter() - start_time) * 1000
        error_type = type(e).__name__

        op_logger.error(
            "operation_error",
            operation_name=operation_name,
            duration_ms=duration_ms,
            error_type=error_type,
            error_message=str(e),
            **{
                k: v for k, v in op_context.items() if k not in ("logger", "start_time")
            },
        )

        # Re-raise the exception
        raise

tracked_request_context async

tracked_request_context(
    request_id=None, storage=None, **initial_context
)

Request context manager that also tracks active requests globally.

Combines request_context() with automatic tracking in the global context tracker for monitoring active request counts.

Parameters:

Name Type Description Default
request_id str | None

Unique request identifier

None
**initial_context Any

Initial context to include in log events

{}

Yields:

Name Type Description
RequestContext AsyncGenerator[RequestContext, None]

Context object with timing and logging

Source code in ccproxy/observability/context.py
@asynccontextmanager
async def tracked_request_context(
    request_id: str | None = None, storage: Any | None = None, **initial_context: Any
) -> AsyncGenerator[RequestContext, None]:
    """
    Request context manager that also tracks active requests globally.

    Combines request_context() with automatic tracking in the global
    context tracker for monitoring active request counts.

    Args:
        request_id: Unique request identifier
        **initial_context: Initial context to include in log events

    Yields:
        RequestContext: Context object with timing and logging
    """
    tracker = get_context_tracker()

    async with request_context(request_id, storage=storage, **initial_context) as ctx:
        # Add to tracker
        await tracker.add_context(ctx)

        try:
            yield ctx
        finally:
            # Remove from tracker
            await tracker.remove_context(ctx.request_id)

get_metrics

get_metrics(
    namespace="ccproxy",
    registry=None,
    pushgateway_client=None,
    settings=None,
)

Get or create global metrics instance with dependency injection.

Parameters:

Name Type Description Default
namespace str

Metric namespace prefix

'ccproxy'
registry CollectorRegistry | None

Custom Prometheus registry

None
pushgateway_client Any | None

Optional pushgateway client for dependency injection

None
settings Any | None

Optional settings instance to avoid circular imports

None

Returns:

Type Description
PrometheusMetrics

PrometheusMetrics instance with full pushgateway support:

PrometheusMetrics
  • push_to_gateway(): Replace all metrics (default)
PrometheusMetrics
  • push_add_to_gateway(): Add metrics to existing job
PrometheusMetrics
  • delete_from_gateway(): Delete all metrics for job
Source code in ccproxy/observability/metrics.py
def get_metrics(
    namespace: str = "ccproxy",
    registry: CollectorRegistry | None = None,
    pushgateway_client: Any | None = None,
    settings: Any | None = None,
) -> PrometheusMetrics:
    """
    Get or create global metrics instance with dependency injection.

    Args:
        namespace: Metric namespace prefix
        registry: Custom Prometheus registry
        pushgateway_client: Optional pushgateway client for dependency injection
        settings: Optional settings instance to avoid circular imports

    Returns:
        PrometheusMetrics instance with full pushgateway support:
        - push_to_gateway(): Replace all metrics (default)
        - push_add_to_gateway(): Add metrics to existing job
        - delete_from_gateway(): Delete all metrics for job
    """
    global _global_metrics

    if _global_metrics is None:
        # Create pushgateway client if not provided via DI
        if pushgateway_client is None:
            from .pushgateway import get_pushgateway_client

            pushgateway_client = get_pushgateway_client()

        _global_metrics = PrometheusMetrics(
            namespace=namespace,
            registry=registry,
            pushgateway_client=pushgateway_client,
        )

    return _global_metrics

reset_metrics

reset_metrics()

Reset global metrics instance (mainly for testing).

Source code in ccproxy/observability/metrics.py
def reset_metrics() -> None:
    """Reset global metrics instance (mainly for testing)."""
    global _global_metrics
    _global_metrics = None

    # Clear Prometheus registry to avoid duplicate metrics in tests
    if PROMETHEUS_AVAILABLE:
        try:
            from prometheus_client import REGISTRY

            # Clear all collectors from the registry
            collectors = list(REGISTRY._collector_to_names.keys())
            for collector in collectors:
                REGISTRY.unregister(collector)
        except Exception:
            # If clearing the registry fails, just continue
            # This is mainly for testing and shouldn't break functionality
            pass

    # Also reset pushgateway client
    from .pushgateway import reset_pushgateway_client

    reset_pushgateway_client()

get_pushgateway_client

get_pushgateway_client()

Get or create global pushgateway client instance.

Source code in ccproxy/observability/pushgateway.py
def get_pushgateway_client() -> PushgatewayClient:
    """Get or create global pushgateway client instance."""
    global _global_pushgateway_client

    if _global_pushgateway_client is None:
        # Import here to avoid circular imports
        from ccproxy.config.settings import get_settings

        settings = get_settings()
        _global_pushgateway_client = PushgatewayClient(settings.observability)

    return _global_pushgateway_client

reset_pushgateway_client

reset_pushgateway_client()

Reset global pushgateway client instance (mainly for testing).

Source code in ccproxy/observability/pushgateway.py
def reset_pushgateway_client() -> None:
    """Reset global pushgateway client instance (mainly for testing)."""
    global _global_pushgateway_client
    _global_pushgateway_client = None