Skip to content

ccproxy.observability.pushgateway

ccproxy.observability.pushgateway

Prometheus Pushgateway integration for batch metrics.

CircuitBreaker

CircuitBreaker(failure_threshold=5, recovery_timeout=60.0)

Simple circuit breaker for pushgateway operations.

Source code in ccproxy/observability/pushgateway.py
def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 60.0):
    self.failure_threshold = failure_threshold
    self.recovery_timeout = recovery_timeout
    self.failure_count = 0
    self.last_failure_time = 0.0
    self.state = "CLOSED"  # CLOSED, OPEN, HALF_OPEN

can_execute

can_execute()

Check if operation can be executed.

Source code in ccproxy/observability/pushgateway.py
def can_execute(self) -> bool:
    """Check if operation can be executed."""
    if self.state == "CLOSED":
        return True
    elif self.state == "OPEN":
        if time.time() - self.last_failure_time > self.recovery_timeout:
            self.state = "HALF_OPEN"
            return True
        return False
    else:  # HALF_OPEN
        return True

record_success

record_success()

Record successful operation.

Source code in ccproxy/observability/pushgateway.py
def record_success(self) -> None:
    """Record successful operation."""
    self.failure_count = 0
    self.state = "CLOSED"

record_failure

record_failure()

Record failed operation.

Source code in ccproxy/observability/pushgateway.py
def record_failure(self) -> None:
    """Record failed operation."""
    self.failure_count += 1
    self.last_failure_time = time.time()

    if self.failure_count >= self.failure_threshold:
        self.state = "OPEN"
        logger.warning(
            "pushgateway_circuit_breaker_opened",
            failure_count=self.failure_count,
            recovery_timeout=self.recovery_timeout,
        )

PushgatewayClient

PushgatewayClient(settings)

Prometheus Pushgateway client using official prometheus_client methods.

Supports standard pushgateway operations: - push_to_gateway(): Replace all metrics for job/instance - pushadd_to_gateway(): Add metrics to existing job/instance - delete_from_gateway(): Delete metrics for job/instance

Also supports VictoriaMetrics remote write protocol for compatibility.

Parameters:

Name Type Description Default
settings ObservabilitySettings

Observability configuration settings

required
Source code in ccproxy/observability/pushgateway.py
def __init__(self, settings: ObservabilitySettings) -> None:
    """Initialize Pushgateway client.

    Args:
        settings: Observability configuration settings
    """
    self.settings = settings
    # Pushgateway is enabled if URL is configured and prometheus_client is available
    self._enabled = PROMETHEUS_AVAILABLE and bool(settings.pushgateway_url)
    self._circuit_breaker = CircuitBreaker(
        failure_threshold=5,
        recovery_timeout=60.0,
    )

    # Only log if pushgateway URL is configured but prometheus is not available
    if settings.pushgateway_url and not PROMETHEUS_AVAILABLE:
        logger.warning(
            "prometheus_client not available. Pushgateway will be disabled. "
            "Install with: pip install prometheus-client"
        )

push_metrics

push_metrics(registry, method='push')

Push metrics to Pushgateway using official prometheus_client methods.

Parameters:

Name Type Description Default
registry CollectorRegistry

Prometheus metrics registry to push

required
method str

Push method - "push" (replace), "pushadd" (add), or "delete"

'push'

Returns:

Type Description
bool

True if push succeeded, False otherwise

Source code in ccproxy/observability/pushgateway.py
def push_metrics(self, registry: CollectorRegistry, method: str = "push") -> bool:
    """Push metrics to Pushgateway using official prometheus_client methods.

    Args:
        registry: Prometheus metrics registry to push
        method: Push method - "push" (replace), "pushadd" (add), or "delete"

    Returns:
        True if push succeeded, False otherwise
    """

    if not self._enabled or not self.settings.pushgateway_url:
        return False

    # Check circuit breaker before attempting operation
    if not self._circuit_breaker.can_execute():
        logger.debug(
            "pushgateway_circuit_breaker_blocking",
            state=self._circuit_breaker.state,
            failure_count=self._circuit_breaker.failure_count,
        )
        return False

    try:
        # Check if URL looks like VictoriaMetrics remote write endpoint
        if "/api/v1/write" in self.settings.pushgateway_url:
            success = self._push_remote_write(registry)
        else:
            success = self._push_standard(registry, method)

        if success:
            self._circuit_breaker.record_success()
        else:
            self._circuit_breaker.record_failure()

        return success

    except Exception as e:
        self._circuit_breaker.record_failure()
        logger.error(
            "pushgateway_push_failed",
            url=self.settings.pushgateway_url,
            job=self.settings.pushgateway_job,
            method=method,
            error=str(e),
            error_type=type(e).__name__,
        )
        return False

push_add_metrics

push_add_metrics(registry)

Add metrics to existing job/instance (pushadd operation).

Parameters:

Name Type Description Default
registry CollectorRegistry

Prometheus metrics registry to add

required

Returns:

Type Description
bool

True if push succeeded, False otherwise

Source code in ccproxy/observability/pushgateway.py
def push_add_metrics(self, registry: CollectorRegistry) -> bool:
    """Add metrics to existing job/instance (pushadd operation).

    Args:
        registry: Prometheus metrics registry to add

    Returns:
        True if push succeeded, False otherwise
    """
    return self.push_metrics(registry, method="pushadd")

delete_metrics

delete_metrics()

Delete all metrics for the configured job.

Returns:

Type Description
bool

True if delete succeeded, False otherwise

Source code in ccproxy/observability/pushgateway.py
def delete_metrics(self) -> bool:
    """Delete all metrics for the configured job.

    Returns:
        True if delete succeeded, False otherwise
    """

    if not self._enabled or not self.settings.pushgateway_url:
        return False

    # Check circuit breaker before attempting operation
    if not self._circuit_breaker.can_execute():
        logger.debug(
            "pushgateway_circuit_breaker_blocking_delete",
            state=self._circuit_breaker.state,
            failure_count=self._circuit_breaker.failure_count,
        )
        return False

    try:
        # Only standard pushgateway supports delete operation
        if "/api/v1/write" in self.settings.pushgateway_url:
            logger.warning("pushgateway_delete_not_supported_for_remote_write")
            return False
        else:
            success = self._push_standard(None, method="delete")  # type: ignore[arg-type]

            if success:
                self._circuit_breaker.record_success()
            else:
                self._circuit_breaker.record_failure()

            return success

    except Exception as e:
        self._circuit_breaker.record_failure()
        logger.error(
            "pushgateway_delete_failed",
            url=self.settings.pushgateway_url,
            job=self.settings.pushgateway_job,
            error=str(e),
            error_type=type(e).__name__,
        )
        return False

is_enabled

is_enabled()

Check if Pushgateway client is enabled and configured.

Source code in ccproxy/observability/pushgateway.py
def is_enabled(self) -> bool:
    """Check if Pushgateway client is enabled and configured."""
    return self._enabled and bool(self.settings.pushgateway_url)

get_pushgateway_client

get_pushgateway_client()

Get or create global pushgateway client instance.

Source code in ccproxy/observability/pushgateway.py
def get_pushgateway_client() -> PushgatewayClient:
    """Get or create global pushgateway client instance."""
    global _global_pushgateway_client

    if _global_pushgateway_client is None:
        # Import here to avoid circular imports
        from ccproxy.config.settings import get_settings

        settings = get_settings()
        _global_pushgateway_client = PushgatewayClient(settings.observability)

    return _global_pushgateway_client

reset_pushgateway_client

reset_pushgateway_client()

Reset global pushgateway client instance (mainly for testing).

Source code in ccproxy/observability/pushgateway.py
def reset_pushgateway_client() -> None:
    """Reset global pushgateway client instance (mainly for testing)."""
    global _global_pushgateway_client
    _global_pushgateway_client = None