Skip to content

ccproxy.observability.access_logger

ccproxy.observability.access_logger

Unified access logging utilities for comprehensive request tracking.

This module provides centralized access logging functionality that can be used across different parts of the application to generate consistent, comprehensive access logs with complete request metadata including token usage and costs.

log_request_access async

log_request_access(
    context,
    status_code=None,
    client_ip=None,
    user_agent=None,
    method=None,
    path=None,
    query=None,
    error_message=None,
    storage=None,
    metrics=None,
    **additional_metadata,
)

Log comprehensive access information for a request.

This function generates a unified access log entry with complete request metadata including timing, tokens, costs, and any additional context. Also stores the access log in DuckDB if available and records Prometheus metrics.

Parameters:

Name Type Description Default
context RequestContext

Request context with timing and metadata

required
status_code int | None

HTTP status code

None
client_ip str | None

Client IP address

None
user_agent str | None

User agent string

None
method str | None

HTTP method

None
path str | None

Request path

None
query str | None

Query parameters

None
error_message str | None

Error message if applicable

None
storage SimpleDuckDBStorage | None

DuckDB storage instance (optional)

None
metrics PrometheusMetrics | None

PrometheusMetrics instance for recording metrics (optional)

None
**additional_metadata Any

Any additional fields to include

{}
Source code in ccproxy/observability/access_logger.py
async def log_request_access(
    context: RequestContext,
    status_code: int | None = None,
    client_ip: str | None = None,
    user_agent: str | None = None,
    method: str | None = None,
    path: str | None = None,
    query: str | None = None,
    error_message: str | None = None,
    storage: SimpleDuckDBStorage | None = None,
    metrics: PrometheusMetrics | None = None,
    **additional_metadata: Any,
) -> None:
    """Log comprehensive access information for a request.

    This function generates a unified access log entry with complete request
    metadata including timing, tokens, costs, and any additional context.
    Also stores the access log in DuckDB if available and records Prometheus metrics.

    Args:
        context: Request context with timing and metadata
        status_code: HTTP status code
        client_ip: Client IP address
        user_agent: User agent string
        method: HTTP method
        path: Request path
        query: Query parameters
        error_message: Error message if applicable
        storage: DuckDB storage instance (optional)
        metrics: PrometheusMetrics instance for recording metrics (optional)
        **additional_metadata: Any additional fields to include
    """
    # Extract basic request info from context metadata if not provided
    ctx_metadata = context.metadata
    method = method or ctx_metadata.get("method")
    path = path or ctx_metadata.get("path")
    status_code = status_code or ctx_metadata.get("status_code")

    # Prepare comprehensive log data
    log_data = {
        "request_id": context.request_id,
        "method": method,
        "path": path,
        "query": query,
        "status_code": status_code,
        "client_ip": client_ip,
        "user_agent": user_agent,
        "duration_ms": context.duration_ms,
        "duration_seconds": context.duration_seconds,
        "error_message": error_message,
    }

    # Add token and cost metrics if available
    token_fields = [
        "tokens_input",
        "tokens_output",
        "cache_read_tokens",
        "cache_write_tokens",
        "cost_usd",
        "cost_sdk_usd",
    ]

    for field in token_fields:
        value = ctx_metadata.get(field)
        if value is not None:
            log_data[field] = value

    # Add service and endpoint info
    service_fields = [
        "endpoint",
        "model",
        "streaming",
        "service_type",
    ]

    for field in service_fields:
        value = ctx_metadata.get(field)
        if value is not None:
            log_data[field] = value

    # Add any additional metadata provided
    log_data.update(additional_metadata)

    # Remove None values to keep log clean
    log_data = {k: v for k, v in log_data.items() if v is not None}

    logger = context.logger.bind(**log_data)
    if not log_data.get("streaming", False):
        # Log as access_log event (structured logging)
        logger.info("access_log")
    elif log_data.get("event_type", "") == "streaming_complete":
        logger.info("access_log")
    else:
        # if streaming is true, and not streaming_complete log as debug
        # real access_log will come later
        logger.debug("access_log")

    # Store in DuckDB if available
    await _store_access_log(log_data, storage)

    # Emit SSE event for real-time dashboard updates
    await _emit_access_event("request_complete", log_data)

    # Record Prometheus metrics if metrics instance is provided
    if metrics and not error_message:
        # Extract required values for metrics
        endpoint = ctx_metadata.get("endpoint", path or "unknown")
        model = ctx_metadata.get("model")
        service_type = ctx_metadata.get("service_type")

        # Record request count
        if method and status_code:
            metrics.record_request(
                method=method,
                endpoint=endpoint,
                model=model,
                status=status_code,
                service_type=service_type,
            )

        # Record response time
        if context.duration_seconds > 0:
            metrics.record_response_time(
                duration_seconds=context.duration_seconds,
                model=model,
                endpoint=endpoint,
                service_type=service_type,
            )

        # Record token usage
        tokens_input = ctx_metadata.get("tokens_input")
        if tokens_input:
            metrics.record_tokens(
                token_count=tokens_input,
                token_type="input",
                model=model,
                service_type=service_type,
            )

        tokens_output = ctx_metadata.get("tokens_output")
        if tokens_output:
            metrics.record_tokens(
                token_count=tokens_output,
                token_type="output",
                model=model,
                service_type=service_type,
            )

        cache_read_tokens = ctx_metadata.get("cache_read_tokens")
        if cache_read_tokens:
            metrics.record_tokens(
                token_count=cache_read_tokens,
                token_type="cache_read",
                model=model,
                service_type=service_type,
            )

        cache_write_tokens = ctx_metadata.get("cache_write_tokens")
        if cache_write_tokens:
            metrics.record_tokens(
                token_count=cache_write_tokens,
                token_type="cache_write",
                model=model,
                service_type=service_type,
            )

        # Record cost
        cost_usd = ctx_metadata.get("cost_usd")
        if cost_usd:
            metrics.record_cost(
                cost_usd=cost_usd,
                model=model,
                cost_type="total",
                service_type=service_type,
            )

    # Record error if there was one
    if metrics and error_message:
        endpoint = ctx_metadata.get("endpoint", path or "unknown")
        model = ctx_metadata.get("model")
        service_type = ctx_metadata.get("service_type")

        # Extract error type from error message or use generic
        error_type = additional_metadata.get(
            "error_type",
            type(error_message).__name__
            if hasattr(error_message, "__class__")
            else "unknown_error",
        )

        metrics.record_error(
            error_type=error_type,
            endpoint=endpoint,
            model=model,
            service_type=service_type,
        )

log_request_start

log_request_start(
    request_id,
    method,
    path,
    client_ip=None,
    user_agent=None,
    query=None,
    **additional_metadata,
)

Log request start event with basic information.

This is used for early/middleware logging when full context isn't available yet.

Parameters:

Name Type Description Default
request_id str

Request identifier

required
method str

HTTP method

required
path str

Request path

required
client_ip str | None

Client IP address

None
user_agent str | None

User agent string

None
query str | None

Query parameters

None
**additional_metadata Any

Any additional fields to include

{}
Source code in ccproxy/observability/access_logger.py
def log_request_start(
    request_id: str,
    method: str,
    path: str,
    client_ip: str | None = None,
    user_agent: str | None = None,
    query: str | None = None,
    **additional_metadata: Any,
) -> None:
    """Log request start event with basic information.

    This is used for early/middleware logging when full context isn't available yet.

    Args:
        request_id: Request identifier
        method: HTTP method
        path: Request path
        client_ip: Client IP address
        user_agent: User agent string
        query: Query parameters
        **additional_metadata: Any additional fields to include
    """
    log_data = {
        "request_id": request_id,
        "method": method,
        "path": path,
        "client_ip": client_ip,
        "user_agent": user_agent,
        "query": query,
        "event_type": "request_start",
        "timestamp": time.time(),
    }

    # Add any additional metadata
    log_data.update(additional_metadata)

    # Remove None values
    log_data = {k: v for k, v in log_data.items() if v is not None}

    logger.debug("access_log_start", **log_data)

    # Emit SSE event for real-time dashboard updates
    # Note: This is a synchronous function, so we schedule the async emission
    try:
        import asyncio

        from ccproxy.observability.sse_events import emit_sse_event

        # Create event data for SSE
        sse_data = {
            "request_id": request_id,
            "method": method,
            "path": path,
            "client_ip": client_ip,
            "user_agent": user_agent,
            "query": query,
        }

        # Remove None values
        sse_data = {k: v for k, v in sse_data.items() if v is not None}

        # Schedule async event emission
        asyncio.create_task(emit_sse_event("request_start", sse_data))

    except Exception as e:
        # Log error but don't fail the request
        logger.debug(
            "sse_emit_failed",
            event_type="request_start",
            error=str(e),
            request_id=request_id,
        )