Skip to content

ccproxy.core.plugins.hooks.implementations.formatters

ccproxy.core.plugins.hooks.implementations.formatters

Core formatters for HTTP request/response logging.

These formatters are used by the core HTTP tracer hook and can be shared across different plugins that need HTTP logging capabilities.

JSONFormatter

JSONFormatter(
    log_dir="/tmp/ccproxy/traces",
    verbose_api=True,
    json_logs_enabled=True,
    redact_sensitive=True,
    truncate_body_preview=1024,
)

Formats requests/responses as structured JSON for observability.

Parameters:

Name Type Description Default
log_dir str

Directory for log files

'/tmp/ccproxy/traces'
verbose_api bool

Enable verbose API logging

True
json_logs_enabled bool

Enable JSON file logging

True
redact_sensitive bool

Redact sensitive headers

True
truncate_body_preview int

Max body preview size

1024
Source code in ccproxy/core/plugins/hooks/implementations/formatters/json.py
def __init__(
    self,
    log_dir: str = "/tmp/ccproxy/traces",
    verbose_api: bool = True,
    json_logs_enabled: bool = True,
    redact_sensitive: bool = True,
    truncate_body_preview: int = 1024,
) -> None:
    """Initialize with configuration.

    Args:
        log_dir: Directory for log files
        verbose_api: Enable verbose API logging
        json_logs_enabled: Enable JSON file logging
        redact_sensitive: Redact sensitive headers
        truncate_body_preview: Max body preview size
    """
    self.log_dir = log_dir
    self.verbose_api = verbose_api
    self.json_logs_enabled = json_logs_enabled
    self.redact_sensitive = redact_sensitive
    self.truncate_body_preview = truncate_body_preview

    # Check if TRACE level is enabled
    current_level = (
        logger._context.get("_level", logging.INFO)
        if hasattr(logger, "_context")
        else logging.INFO
    )
    self.trace_enabled = self.verbose_api or current_level <= TRACE_LEVEL

    # Setup log directory if file logging is enabled
    self.request_log_dir = None
    if self.json_logs_enabled:
        self.request_log_dir = Path(log_dir)
        self.request_log_dir.mkdir(parents=True, exist_ok=True)

from_config classmethod

from_config(config)

Create JSONFormatter from a RequestTracerConfig.

Parameters:

Name Type Description Default
config Any

RequestTracerConfig instance

required

Returns:

Type Description
JSONFormatter

JSONFormatter instance

Source code in ccproxy/core/plugins/hooks/implementations/formatters/json.py
@classmethod
def from_config(cls, config: Any) -> "JSONFormatter":
    """Create JSONFormatter from a RequestTracerConfig.

    Args:
        config: RequestTracerConfig instance

    Returns:
        JSONFormatter instance
    """
    return cls(
        log_dir=config.get_json_log_dir(),
        verbose_api=config.verbose_api,
        json_logs_enabled=config.json_logs_enabled,
        redact_sensitive=config.redact_sensitive,
        truncate_body_preview=config.truncate_body_preview,
    )

redact_headers staticmethod

redact_headers(headers)

Redact sensitive headers for safe logging.

  • Replaces authorization, x-api-key, cookie values with [REDACTED]
  • Preserves header names for debugging
  • Returns new dict without modifying original
Source code in ccproxy/core/plugins/hooks/implementations/formatters/json.py
@staticmethod
def redact_headers(headers: dict[str, str]) -> dict[str, str]:
    """Redact sensitive headers for safe logging.

    - Replaces authorization, x-api-key, cookie values with [REDACTED]
    - Preserves header names for debugging
    - Returns new dict without modifying original
    """
    sensitive_headers = {
        "authorization",
        "x-api-key",
        "api-key",
        "cookie",
        "x-auth-token",
        "x-secret-key",
    }

    redacted = {}
    for key, value in headers.items():
        if key.lower() in sensitive_headers:
            redacted[key] = "[REDACTED]"
        else:
            redacted[key] = value
    return redacted

log_request async

log_request(
    request_id,
    method,
    url,
    headers,
    body,
    request_type="provider",
    context=None,
    hook_type=None,
)

Log structured request data.

  • Logs at TRACE level with redacted headers
  • Writes to request log file with complete data (if configured)
Source code in ccproxy/core/plugins/hooks/implementations/formatters/json.py
async def log_request(
    self,
    request_id: str,
    method: str,
    url: str,
    headers: HookHeaders | dict[str, str],
    body: bytes | None,
    request_type: str = "provider",  # "client" or "provider"
    context: Any = None,  # RequestContext
    hook_type: str | None = None,  # Hook type for filename (e.g., "tracer", "http")
) -> None:
    """Log structured request data.

    - Logs at TRACE level with redacted headers
    - Writes to request log file with complete data (if configured)
    """
    if not self.trace_enabled:
        return

    # Normalize headers (preserve order/case if dict-like)
    headers_dict = (
        headers.to_dict() if hasattr(headers, "to_dict") else dict(headers)
    )

    # Log at TRACE level with redacted headers
    log_headers = (
        self.redact_headers(headers_dict) if self.redact_sensitive else headers_dict
    )

    if hasattr(logger, "trace"):
        logger.trace(
            "api_request",
            category="http",
            request_id=request_id,
            method=method,
            url=url,
            headers=log_headers,
            body_size=len(body) if body else 0,
        )
    elif self.verbose_api:
        # Fallback for backward compatibility
        logger.info(
            "api_request",
            category="http",
            request_id=request_id,
            method=method,
            url=url,
            headers=log_headers,
            body_size=len(body) if body else 0,
        )

    # Write to file if configured
    if self.request_log_dir and self.json_logs_enabled:
        # Build file suffix with hook type
        base_suffix = (
            f"{request_type}_request" if request_type != "provider" else "request"
        )
        if hook_type:
            file_suffix = f"{base_suffix}_{hook_type}"
        else:
            file_suffix = base_suffix

        base_id = self._compose_file_id_with_timestamp(request_id)
        request_file = self.request_log_dir / f"{base_id}_{file_suffix}.json"

        # Handle body content - could be bytes, dict/list (from JSON), or string
        body_content = None
        if body is not None:
            if isinstance(body, dict | list):
                # Already parsed JSON object from hook context
                body_content = body
            elif isinstance(body, bytes):
                # Raw bytes - try to parse as JSON first, then string, then base64
                try:
                    # First try to decode as UTF-8 string
                    body_str = body.decode("utf-8")
                    # Then try to parse as JSON
                    body_content = json.loads(body_str)
                except (json.JSONDecodeError, UnicodeDecodeError):
                    # Not JSON, try plain string
                    try:
                        body_content = body.decode("utf-8", errors="replace")
                    except Exception:
                        # Last resort: encode as base64
                        body_content = {
                            "_type": "base64",
                            "data": base64.b64encode(body).decode("ascii"),
                        }
            elif isinstance(body, str):
                # String body - try to parse as JSON, otherwise keep as string
                try:
                    body_content = json.loads(body)
                except json.JSONDecodeError:
                    body_content = body
            else:
                # Other type - convert to string
                body_content = str(body)

        request_data = {
            "request_id": request_id,
            "method": method,
            "url": url,
            "headers": headers_dict,  # Full headers in file
            "body": body_content,
            "type": request_type,
        }

        # Add cmd_id for CLI correlation if present
        cmd_id = self._current_cmd_id()
        if cmd_id:
            request_data["cmd_id"] = cmd_id

        # Add context data if available
        if context and hasattr(context, "to_dict"):
            try:
                context_data = context.to_dict()
                if context_data:
                    request_data["context"] = context_data
            except Exception as e:
                logger.debug(
                    "context_serialization_error",
                    error=str(e),
                    request_id=request_id,
                )

        request_file.write_text(json.dumps(request_data, indent=2, default=str))

log_response async

log_response(
    request_id,
    status,
    headers,
    body,
    response_type="provider",
    context=None,
    hook_type=None,
)

Log structured response data.

  • Logs at TRACE level
  • Truncates body preview for console
  • Handles binary data gracefully
Source code in ccproxy/core/plugins/hooks/implementations/formatters/json.py
async def log_response(
    self,
    request_id: str,
    status: int,
    headers: HookHeaders | dict[str, str],
    body: bytes,
    response_type: str = "provider",  # "client" or "provider"
    context: Any = None,  # RequestContext
    hook_type: str | None = None,  # Hook type for filename (e.g., "tracer", "http")
) -> None:
    """Log structured response data.

    - Logs at TRACE level
    - Truncates body preview for console
    - Handles binary data gracefully
    """
    if not self.trace_enabled:
        return

    body_preview = self._get_body_preview(body)

    # Normalize headers (preserve order/case if dict-like)
    headers_dict = (
        headers.to_dict() if hasattr(headers, "to_dict") else dict(headers)
    )

    # Log at TRACE level
    if hasattr(logger, "trace"):
        logger.trace(
            "api_response",
            category="http",
            request_id=request_id,
            status=status,
            headers=headers_dict,
            body_preview=body_preview,
            body_size=len(body),
        )
    else:
        # Fallback for backward compatibility
        logger.info(
            "api_response",
            category="http",
            request_id=request_id,
            status=status,
            headers=headers_dict,
            body_preview=body_preview,
            body_size=len(body),
        )

    # Write to file if configured
    if self.request_log_dir and self.json_logs_enabled:
        # Build file suffix with hook type
        base_suffix = (
            f"{response_type}_response"
            if response_type != "provider"
            else "response"
        )
        if hook_type:
            file_suffix = f"{base_suffix}_{hook_type}"
        else:
            file_suffix = base_suffix
        logger.debug(
            "Writing response JSON file",
            request_id=request_id,
            status=status,
            response_type=response_type,
            file_suffix=file_suffix,
            body_type=type(body).__name__,
            body_size=len(body) if body else 0,
            body_preview=body[:100] if body else None,
        )
        base_id = self._compose_file_id_with_timestamp(request_id)
        response_file = self.request_log_dir / f"{base_id}_{file_suffix}.json"

        # Try to parse body as JSON first, then string, then base64
        body_content: str | dict[str, Any] = ""
        if body:
            try:
                # First try to decode as UTF-8 string
                body_str = body.decode("utf-8")
                # Then try to parse as JSON
                body_content = json.loads(body_str)
            except (json.JSONDecodeError, UnicodeDecodeError):
                # Not JSON, try plain string
                try:
                    body_content = body.decode("utf-8", errors="replace")
                except Exception:
                    # Last resort: encode as base64
                    import base64

                    body_content = {
                        "_type": "base64",
                        "data": base64.b64encode(body).decode("ascii"),
                    }

        response_data = {
            "request_id": request_id,
            "status": status,
            "headers": headers_dict,
            "body": body_content,
            "type": response_type,
        }

        # Add cmd_id for CLI correlation if present
        cmd_id = self._current_cmd_id()
        if cmd_id:
            response_data["cmd_id"] = cmd_id

        # Add context data if available (including cost/metrics)
        if context and hasattr(context, "to_dict"):
            try:
                context_data = context.to_dict()
                if context_data:
                    response_data["context"] = context_data
            except Exception as e:
                logger.debug(
                    "context_serialization_error",
                    error=str(e),
                    request_id=request_id,
                )

        response_file.write_text(json.dumps(response_data, indent=2, default=str))

log_stream_chunk async

log_stream_chunk(request_id, chunk, chunk_number)

Record individual stream chunk (optional, for deep debugging).

Source code in ccproxy/core/plugins/hooks/implementations/formatters/json.py
async def log_stream_chunk(
    self, request_id: str, chunk: bytes, chunk_number: int
) -> None:
    """Record individual stream chunk (optional, for deep debugging)."""
    logger.debug(
        "stream_chunk",
        category="streaming",
        request_id=request_id,
        chunk_number=chunk_number,
        chunk_size=len(chunk),
    )

log_error async

log_error(request_id, error, duration=None, provider=None)

Log error information.

Source code in ccproxy/core/plugins/hooks/implementations/formatters/json.py
async def log_error(
    self,
    request_id: str,
    error: Exception | None,
    duration: float | None = None,
    provider: str | None = None,
) -> None:
    """Log error information."""
    if not self.verbose_api:
        return

    error_data: dict[str, Any] = {
        "request_id": request_id,
        "error": str(error) if error else "unknown",
        "category": "error",
    }

    if duration is not None:
        error_data["duration"] = duration
    if provider:
        error_data["provider"] = provider

    logger.error("request_error", **error_data)

log_provider_request async

log_provider_request(
    request_id, provider, method, url, headers, body
)

Log provider request.

Source code in ccproxy/core/plugins/hooks/implementations/formatters/json.py
async def log_provider_request(
    self,
    request_id: str,
    provider: str,
    method: str,
    url: str,
    headers: dict[str, str],
    body: bytes | None,
) -> None:
    """Log provider request."""
    await self.log_request(
        request_id=request_id,
        method=method,
        url=url,
        headers=headers,
        body=body,
        request_type="provider",
    )

log_provider_response async

log_provider_response(
    request_id, provider, status_code, headers, body
)

Log provider response.

Source code in ccproxy/core/plugins/hooks/implementations/formatters/json.py
async def log_provider_response(
    self,
    request_id: str,
    provider: str,
    status_code: int,
    headers: dict[str, str],
    body: bytes | None,
) -> None:
    """Log provider response."""
    await self.log_response(
        request_id=request_id,
        status=status_code,
        headers=headers,
        body=body or b"",
        response_type="provider",
    )

log_stream_start async

log_stream_start(request_id, provider=None)

Log stream start.

Source code in ccproxy/core/plugins/hooks/implementations/formatters/json.py
async def log_stream_start(
    self,
    request_id: str,
    provider: str | None = None,
) -> None:
    """Log stream start."""
    if not self.verbose_api:
        return

    log_data: dict[str, Any] = {
        "request_id": request_id,
        "category": "streaming",
    }
    if provider:
        log_data["provider"] = provider

    logger.info("stream_start", **log_data)

log_stream_complete async

log_stream_complete(
    request_id,
    provider=None,
    total_chunks=None,
    total_bytes=None,
    usage_metrics=None,
)

Log stream completion with metrics.

Source code in ccproxy/core/plugins/hooks/implementations/formatters/json.py
async def log_stream_complete(
    self,
    request_id: str,
    provider: str | None = None,
    total_chunks: int | None = None,
    total_bytes: int | None = None,
    usage_metrics: dict[str, Any] | None = None,
) -> None:
    """Log stream completion with metrics."""
    if not self.verbose_api:
        return

    log_data: dict[str, Any] = {
        "request_id": request_id,
        "category": "streaming",
    }
    if provider:
        log_data["provider"] = provider
    if total_chunks is not None:
        log_data["total_chunks"] = total_chunks
    if total_bytes is not None:
        log_data["total_bytes"] = total_bytes
    if usage_metrics:
        log_data["usage_metrics"] = usage_metrics

    logger.info("stream_complete", **log_data)

RawHTTPFormatter

RawHTTPFormatter(
    log_dir="/tmp/ccproxy/traces",
    enabled=True,
    log_client_request=True,
    log_client_response=True,
    log_provider_request=True,
    log_provider_response=True,
    max_body_size=10485760,
    exclude_headers=None,
)

Formats and logs raw HTTP protocol data.

Parameters:

Name Type Description Default
log_dir str

Directory for raw HTTP log files

'/tmp/ccproxy/traces'
enabled bool

Enable raw HTTP logging

True
log_client_request bool

Log client requests

True
log_client_response bool

Log client responses

True
log_provider_request bool

Log provider requests

True
log_provider_response bool

Log provider responses

True
max_body_size int

Maximum body size to log

10485760
exclude_headers list[str] | None

Headers to redact

None
Source code in ccproxy/core/plugins/hooks/implementations/formatters/raw.py
def __init__(
    self,
    log_dir: str = "/tmp/ccproxy/traces",
    enabled: bool = True,
    log_client_request: bool = True,
    log_client_response: bool = True,
    log_provider_request: bool = True,
    log_provider_response: bool = True,
    max_body_size: int = 10485760,  # 10MB
    exclude_headers: list[str] | None = None,
) -> None:
    """Initialize with configuration.

    Args:
        log_dir: Directory for raw HTTP log files
        enabled: Enable raw HTTP logging
        log_client_request: Log client requests
        log_client_response: Log client responses
        log_provider_request: Log provider requests
        log_provider_response: Log provider responses
        max_body_size: Maximum body size to log
        exclude_headers: Headers to redact
    """
    self.enabled = enabled
    self.log_dir = Path(log_dir)
    self._log_client_request = log_client_request
    self._log_client_response = log_client_response
    self._log_provider_request = log_provider_request
    self._log_provider_response = log_provider_response
    self.max_body_size = max_body_size
    self.exclude_headers = [
        h.lower()
        for h in (
            exclude_headers
            or ["authorization", "x-api-key", "cookie", "x-auth-token"]
        )
    ]

    if self.enabled:
        # Create log directory if it doesn't exist
        try:
            self.log_dir.mkdir(parents=True, exist_ok=True)
        except OSError as e:
            logger.error(
                "failed_to_create_raw_log_directory",
                log_dir=str(self.log_dir),
                error=str(e),
                exc_info=e,
            )
            # Disable logging if we can't create the directory
            self.enabled = False

    # Track which files we've already created (for logging purposes only)
    self._created_files: set[str] = set()

from_config classmethod

from_config(config)

Create RawHTTPFormatter from a RequestTracerConfig.

Parameters:

Name Type Description Default
config Any

RequestTracerConfig instance

required

Returns:

Type Description
RawHTTPFormatter

RawHTTPFormatter instance

Source code in ccproxy/core/plugins/hooks/implementations/formatters/raw.py
@classmethod
def from_config(cls, config: Any) -> "RawHTTPFormatter":
    """Create RawHTTPFormatter from a RequestTracerConfig.

    Args:
        config: RequestTracerConfig instance

    Returns:
        RawHTTPFormatter instance
    """
    return cls(
        log_dir=config.get_raw_log_dir(),
        enabled=config.raw_http_enabled,
        log_client_request=config.log_client_request,
        log_client_response=config.log_client_response,
        log_provider_request=config.log_provider_request,
        log_provider_response=config.log_provider_response,
        max_body_size=config.max_body_size,
        exclude_headers=config.exclude_headers,
    )

should_log

should_log()

Check if raw logging is enabled.

Source code in ccproxy/core/plugins/hooks/implementations/formatters/raw.py
def should_log(self) -> bool:
    """Check if raw logging is enabled."""
    return bool(self.enabled)

log_client_request async

log_client_request(request_id, raw_data, hook_type=None)

Log raw client request data.

Source code in ccproxy/core/plugins/hooks/implementations/formatters/raw.py
async def log_client_request(
    self, request_id: str, raw_data: bytes, hook_type: str | None = None
) -> None:
    """Log raw client request data."""
    if not self.enabled or not self._log_client_request:
        return

    # Truncate if too large
    if len(raw_data) > self.max_body_size:
        raw_data = raw_data[: self.max_body_size] + b"\n[TRUNCATED]"

    base_id = self._compose_file_id_with_timestamp(request_id)
    base_suffix = "client_request"
    if hook_type:
        file_suffix = f"{base_suffix}_{hook_type}"
    else:
        file_suffix = base_suffix
    file_path = self.log_dir / f"{base_id}_{file_suffix}.http"

    # Log file creation (only once per unique file path)
    if str(file_path) not in self._created_files:
        self._created_files.add(str(file_path))
        logger.debug(
            "raw_http_log_created",
            request_id=request_id,
            log_type="client_request",
            file_path=str(file_path),
            category="raw_formatter",
        )

    # Write data to file (append mode for multiple chunks)
    async with aiofiles.open(file_path, "ab") as f:
        await f.write(raw_data)

log_client_response async

log_client_response(request_id, raw_data, hook_type=None)

Log raw client response data.

Source code in ccproxy/core/plugins/hooks/implementations/formatters/raw.py
async def log_client_response(
    self, request_id: str, raw_data: bytes, hook_type: str | None = None
) -> None:
    """Log raw client response data."""
    if not self.enabled or not self._log_client_response:
        return

    # Truncate if too large
    if len(raw_data) > self.max_body_size:
        raw_data = raw_data[: self.max_body_size] + b"\n[TRUNCATED]"

    base_id = self._compose_file_id_with_timestamp(request_id)
    base_suffix = "client_response"
    if hook_type:
        file_suffix = f"{base_suffix}_{hook_type}"
    else:
        file_suffix = base_suffix
    file_path = self.log_dir / f"{base_id}_{file_suffix}.http"

    # Log file creation (only once per unique file path)
    if str(file_path) not in self._created_files:
        self._created_files.add(str(file_path))
        logger.debug(
            "raw_http_log_created",
            request_id=request_id,
            log_type="client_response",
            file_path=str(file_path),
            category="raw_formatter",
            length=len(raw_data),
        )

    # Write data to file (append mode for multiple chunks)
    logger.debug("open_file_", length=len(raw_data), file_path=str(file_path))

    # Note: Async file write is only creating the file
    # and not writing data.
    # It seem to block the event loop and make the following hook to not execute
    # for example the request.completed
    # sync write seem to solve the issue
    # with Path(file_path).open("ab") as sync_f:
    #     sync_f.write(raw_data)
    async with aiofiles.open(file_path, "wb") as f:
        logger.debug("writing_raw_data", length=len(raw_data))
        await f.write(raw_data)

    logger.debug("finish_to_write", length=len(raw_data), file_path=str(file_path))

log_provider_request async

log_provider_request(request_id, raw_data, hook_type=None)

Log raw provider request data.

Source code in ccproxy/core/plugins/hooks/implementations/formatters/raw.py
async def log_provider_request(
    self, request_id: str, raw_data: bytes, hook_type: str | None = None
) -> None:
    """Log raw provider request data."""
    if not self.enabled or not self._log_provider_request:
        return

    # Truncate if too large
    if len(raw_data) > self.max_body_size:
        raw_data = raw_data[: self.max_body_size] + b"\n[TRUNCATED]"

    base_id = self._compose_file_id_with_timestamp(request_id)
    base_suffix = "provider_request"
    if hook_type:
        file_suffix = f"{base_suffix}_{hook_type}"
    else:
        file_suffix = base_suffix
    file_path = self.log_dir / f"{base_id}_{file_suffix}.http"

    # Log file creation (only once per unique file path)
    if str(file_path) not in self._created_files:
        self._created_files.add(str(file_path))
        logger.debug(
            "raw_http_log_created",
            request_id=request_id,
            log_type="provider_request",
            file_path=str(file_path),
            category="raw_formatter",
        )

    async with aiofiles.open(file_path, "ab") as f:
        await f.write(raw_data)

log_provider_response async

log_provider_response(request_id, raw_data, hook_type=None)

Log raw provider response data.

Source code in ccproxy/core/plugins/hooks/implementations/formatters/raw.py
async def log_provider_response(
    self, request_id: str, raw_data: bytes, hook_type: str | None = None
) -> None:
    """Log raw provider response data."""
    if not self.enabled or not self._log_provider_response:
        return

    # Truncate if too large
    if len(raw_data) > self.max_body_size:
        raw_data = raw_data[: self.max_body_size] + b"\n[TRUNCATED]"

    base_id = self._compose_file_id_with_timestamp(request_id)
    base_suffix = "provider_response"
    if hook_type:
        file_suffix = f"{base_suffix}_{hook_type}"
    else:
        file_suffix = base_suffix
    file_path = self.log_dir / f"{base_id}_{file_suffix}.http"

    # Log file creation (only once per unique file path)
    if str(file_path) not in self._created_files:
        self._created_files.add(str(file_path))
        logger.debug(
            "raw_http_log_created",
            request_id=request_id,
            log_type="provider_response",
            file_path=str(file_path),
            category="raw_formatter",
        )

    # Write data to file (append mode for multiple chunks)
    async with aiofiles.open(file_path, "ab") as f:
        await f.write(raw_data)

build_raw_request

build_raw_request(method, url, headers, body=None)

Build raw HTTP/1.1 request format.

Source code in ccproxy/core/plugins/hooks/implementations/formatters/raw.py
def build_raw_request(
    self,
    method: str,
    url: str,
    headers: Sequence[tuple[bytes | str, bytes | str]],
    body: bytes | None = None,
) -> bytes:
    """Build raw HTTP/1.1 request format."""
    # Parse URL to get path
    from urllib.parse import urlparse

    parsed = urlparse(url)
    path = parsed.path or "/"
    if parsed.query:
        path += f"?{parsed.query}"

    # Build request line
    lines = [f"{method} {path} HTTP/1.1"]

    # # Add Host header if not present
    # has_host = any(
    #     (
    #         h[0].lower() == b"host"
    #         if isinstance(h[0], bytes)
    #         else h[0].lower() == "host"
    #     )
    #     for h in headers
    # )
    # if not has_host and parsed.netloc:
    #     lines.append(f"Host: {parsed.netloc}")
    #
    # Add headers with optional redaction
    for name, value in headers:
        if isinstance(name, bytes):
            name = name.decode("ascii", errors="ignore")
        if isinstance(value, bytes):
            value = value.decode("ascii", errors="ignore")

        # Check if header should be redacted
        if name.lower() in self.exclude_headers:
            lines.append(f"{name}: [REDACTED]")
        else:
            lines.append(f"{name}: {value}")

    # Build raw request
    raw = "\r\n".join(lines).encode("utf-8")
    raw += b"\r\n\r\n"

    # Add body if present
    if body:
        raw += body

    return raw

build_raw_response

build_raw_response(status_code, headers, reason='OK')

Build raw HTTP/1.1 response headers.

Source code in ccproxy/core/plugins/hooks/implementations/formatters/raw.py
def build_raw_response(
    self,
    status_code: int,
    headers: Sequence[tuple[bytes | str, bytes | str]],
    reason: str = "OK",
) -> bytes:
    """Build raw HTTP/1.1 response headers."""
    # Build status line
    lines = [f"HTTP/1.1 {status_code} {reason}"]

    # Add headers with optional redaction
    for name, value in headers:
        if isinstance(name, bytes):
            name = name.decode("ascii", errors="ignore")
        if isinstance(value, bytes):
            value = value.decode("ascii", errors="ignore")

        # Check if header should be redacted
        if name.lower() in self.exclude_headers:
            lines.append(f"{name}: [REDACTED]")
        else:
            lines.append(f"{name}: {value}")

    # Build raw response headers
    raw = "\r\n".join(lines).encode("utf-8")
    raw += b"\r\n\r\n"

    return raw