Skip to content

ccproxy.utils.simple_request_logger

ccproxy.utils.simple_request_logger

Simple request logging utility for content logging across all service layers.

should_log_requests

should_log_requests()

Check if request logging is enabled via environment variable.

Returns:

Type Description
bool

True if CCPROXY_LOG_REQUESTS is set to 'true' (case-insensitive)

Source code in ccproxy/utils/simple_request_logger.py
def should_log_requests() -> bool:
    """Check if request logging is enabled via environment variable.

    Returns:
        True if CCPROXY_LOG_REQUESTS is set to 'true' (case-insensitive)
    """
    return os.environ.get("CCPROXY_LOG_REQUESTS", "false").lower() == "true"

get_request_log_dir

get_request_log_dir()

Get the request log directory from environment variable.

Returns:

Type Description
Path | None

Path object if CCPROXY_REQUEST_LOG_DIR is set and valid, None otherwise

Source code in ccproxy/utils/simple_request_logger.py
def get_request_log_dir() -> Path | None:
    """Get the request log directory from environment variable.

    Returns:
        Path object if CCPROXY_REQUEST_LOG_DIR is set and valid, None otherwise
    """
    log_dir = os.environ.get("CCPROXY_REQUEST_LOG_DIR")
    if not log_dir:
        return None

    path = Path(log_dir)
    try:
        path.mkdir(parents=True, exist_ok=True)
        return path
    except Exception as e:
        logger.error(
            "failed_to_create_request_log_dir",
            log_dir=log_dir,
            error=str(e),
        )
        return None

get_timestamp_prefix

get_timestamp_prefix()

Generate timestamp prefix in YYYYMMDDhhmmss format.

Returns:

Type Description
str

Timestamp string in YYYYMMDDhhmmss format (UTC)

Source code in ccproxy/utils/simple_request_logger.py
def get_timestamp_prefix() -> str:
    """Generate timestamp prefix in YYYYMMDDhhmmss format.

    Returns:
        Timestamp string in YYYYMMDDhhmmss format (UTC)
    """
    return datetime.now(UTC).strftime("%Y%m%d%H%M%S")

write_request_log async

write_request_log(
    request_id, log_type, data, timestamp=None
)

Write request/response data to JSON file.

Parameters:

Name Type Description Default
request_id str

Unique request identifier

required
log_type str

Type of log (e.g., 'middleware_request', 'upstream_response')

required
data dict[str, Any]

Data to log as JSON

required
timestamp str | None

Optional timestamp prefix (defaults to current time)

None
Source code in ccproxy/utils/simple_request_logger.py
async def write_request_log(
    request_id: str,
    log_type: str,
    data: dict[str, Any],
    timestamp: str | None = None,
) -> None:
    """Write request/response data to JSON file.

    Args:
        request_id: Unique request identifier
        log_type: Type of log (e.g., 'middleware_request', 'upstream_response')
        data: Data to log as JSON
        timestamp: Optional timestamp prefix (defaults to current time)
    """
    if not should_log_requests():
        return

    log_dir = get_request_log_dir()
    if not log_dir:
        return

    timestamp = timestamp or get_timestamp_prefix()
    filename = f"{timestamp}_{request_id}_{log_type}.json"
    file_path = log_dir / filename

    try:
        # Write JSON data to file asynchronously
        def write_file() -> None:
            with file_path.open("w", encoding="utf-8") as f:
                json.dump(data, f, indent=2, default=str, ensure_ascii=False)

        # Run in thread pool to avoid blocking
        await asyncio.get_event_loop().run_in_executor(None, write_file)

        logger.debug(
            "request_log_written",
            request_id=request_id,
            log_type=log_type,
            file_path=str(file_path),
        )

    except Exception as e:
        logger.error(
            "failed_to_write_request_log",
            request_id=request_id,
            log_type=log_type,
            file_path=str(file_path),
            error=str(e),
        )

write_streaming_log async

write_streaming_log(
    request_id, log_type, data, timestamp=None
)

Write streaming data to raw file.

Parameters:

Name Type Description Default
request_id str

Unique request identifier

required
log_type str

Type of log (e.g., 'middleware_streaming', 'upstream_streaming')

required
data bytes

Raw bytes to log

required
timestamp str | None

Optional timestamp prefix (defaults to current time)

None
Source code in ccproxy/utils/simple_request_logger.py
async def write_streaming_log(
    request_id: str,
    log_type: str,
    data: bytes,
    timestamp: str | None = None,
) -> None:
    """Write streaming data to raw file.

    Args:
        request_id: Unique request identifier
        log_type: Type of log (e.g., 'middleware_streaming', 'upstream_streaming')
        data: Raw bytes to log
        timestamp: Optional timestamp prefix (defaults to current time)
    """
    if not should_log_requests():
        return

    log_dir = get_request_log_dir()
    if not log_dir:
        return

    timestamp = timestamp or get_timestamp_prefix()
    filename = f"{timestamp}_{request_id}_{log_type}.raw"
    file_path = log_dir / filename

    try:
        # Write raw data to file asynchronously
        def write_file() -> None:
            with file_path.open("wb") as f:
                f.write(data)

        # Run in thread pool to avoid blocking
        await asyncio.get_event_loop().run_in_executor(None, write_file)

        logger.debug(
            "streaming_log_written",
            request_id=request_id,
            log_type=log_type,
            file_path=str(file_path),
            data_size=len(data),
        )

    except Exception as e:
        logger.error(
            "failed_to_write_streaming_log",
            request_id=request_id,
            log_type=log_type,
            file_path=str(file_path),
            error=str(e),
        )

append_streaming_log async

append_streaming_log(
    request_id, log_type, data, timestamp=None
)

Append streaming data using batching for performance.

Parameters:

Name Type Description Default
request_id str

Unique request identifier

required
log_type str

Type of log (e.g., 'middleware_streaming', 'upstream_streaming')

required
data bytes

Raw bytes to append

required
timestamp str | None

Optional timestamp prefix (defaults to current time)

None
Source code in ccproxy/utils/simple_request_logger.py
async def append_streaming_log(
    request_id: str,
    log_type: str,
    data: bytes,
    timestamp: str | None = None,
) -> None:
    """Append streaming data using batching for performance.

    Args:
        request_id: Unique request identifier
        log_type: Type of log (e.g., 'middleware_streaming', 'upstream_streaming')
        data: Raw bytes to append
        timestamp: Optional timestamp prefix (defaults to current time)
    """
    if not should_log_requests():
        return

    log_dir = get_request_log_dir()
    if not log_dir:
        return

    timestamp = timestamp or get_timestamp_prefix()
    batch_key = f"{request_id}_{log_type}"

    # Get or create batch for this request/log_type combination
    if batch_key not in _streaming_batches:
        _streaming_batches[batch_key] = {
            "request_id": request_id,
            "log_type": log_type,
            "timestamp": timestamp,
            "data": bytearray(),
            "chunk_count": 0,
            "first_chunk_time": asyncio.get_event_loop().time(),
            "last_flush_task": None,
        }

    batch = _streaming_batches[batch_key]
    batch["data"].extend(data)
    batch["chunk_count"] += 1

    # Cancel previous flush task if it exists
    if batch["last_flush_task"] and not batch["last_flush_task"].done():
        batch["last_flush_task"].cancel()

    # Check if we should flush now
    should_flush = (
        len(batch["data"]) >= _STREAMING_BATCH_SIZE
        or batch["chunk_count"] >= 50  # Max 50 chunks per batch
    )

    if should_flush:
        await _flush_streaming_batch(batch_key)
    else:
        # Schedule a delayed flush
        batch["last_flush_task"] = asyncio.create_task(
            _delayed_flush_streaming_batch(batch_key, _STREAMING_BATCH_TIMEOUT)
        )

flush_all_streaming_batches async

flush_all_streaming_batches()

Flush all pending streaming batches. Call this on shutdown.

Source code in ccproxy/utils/simple_request_logger.py
async def flush_all_streaming_batches() -> None:
    """Flush all pending streaming batches. Call this on shutdown."""
    batch_keys = list(_streaming_batches.keys())
    for batch_key in batch_keys:
        await _flush_streaming_batch(batch_key)