Skip to content

ccproxy.http.client

ccproxy.http.client

Centralized HTTP client configuration and abstractions for CCProxy.

This module provides: - HTTP client factory with optimized configuration for proxy use cases - Generic HTTP client abstractions for pure forwarding without business logic - Lifecycle managed by the ServiceContainer

HTTPError

HTTPError(message, status_code=None)

Bases: Exception

Base exception for HTTP client errors.

Parameters:

Name Type Description Default
message str

Error message

required
status_code int | None

HTTP status code (optional)

None
Source code in ccproxy/http/client.py
def __init__(self, message: str, status_code: int | None = None) -> None:
    """Initialize HTTP error.

    Args:
        message: Error message
        status_code: HTTP status code (optional)
    """
    super().__init__(message)
    self.status_code = status_code

HTTPTimeoutError

HTTPTimeoutError(message='Request timed out')

Bases: HTTPError

Exception raised when HTTP request times out.

Parameters:

Name Type Description Default
message str

Error message

'Request timed out'
Source code in ccproxy/http/client.py
def __init__(self, message: str = "Request timed out") -> None:
    """Initialize timeout error.

    Args:
        message: Error message
    """
    super().__init__(message, status_code=408)

HTTPConnectionError

HTTPConnectionError(message='Connection failed')

Bases: HTTPError

Exception raised when HTTP connection fails.

Parameters:

Name Type Description Default
message str

Error message

'Connection failed'
Source code in ccproxy/http/client.py
def __init__(self, message: str = "Connection failed") -> None:
    """Initialize connection error.

    Args:
        message: Error message
    """
    super().__init__(message, status_code=503)

HTTPClientFactory

Factory for creating optimized HTTP clients.

Provides centralized configuration for HTTP clients with: - Consistent timeout/retry configuration - Unified connection limits - HTTP/2 multiplexing for non-streaming endpoints - Centralized observability hooks (via HookableHTTPClient)

create_client staticmethod

create_client(
    *,
    settings=None,
    timeout_connect=5.0,
    timeout_read=240.0,
    max_keepalive_connections=100,
    max_connections=1000,
    http2=True,
    verify=True,
    hook_manager=None,
    **kwargs,
)

Create an optimized HTTP client with recommended configuration.

Parameters:

Name Type Description Default
settings Settings | None

Optional settings object for additional configuration

None
timeout_connect float

Connection timeout in seconds

5.0
timeout_read float

Read timeout in seconds (long for streaming)

240.0
max_keepalive_connections int

Max keep-alive connections for reuse

100
max_connections int

Max total concurrent connections

1000
http2 bool

Enable HTTP/2 multiplexing

True
verify bool | str

SSL verification (True/False or path to CA bundle)

True
hook_manager Any | None

Optional HookManager for request/response interception

None
**kwargs Any

Additional httpx.AsyncClient arguments

{}

Returns:

Type Description
AsyncClient

Configured httpx.AsyncClient instance

Source code in ccproxy/http/client.py
@staticmethod
def create_client(
    *,
    settings: Settings | None = None,
    timeout_connect: float = 5.0,
    timeout_read: float = 240.0,  # Long timeout for streaming
    max_keepalive_connections: int = 100,  # For non-streaming endpoints
    max_connections: int = 1000,  # High limit for concurrent streams
    http2: bool = True,  # Enable multiplexing (requires httpx[http2])
    verify: bool | str = True,
    hook_manager: Any | None = None,
    **kwargs: Any,
) -> httpx.AsyncClient:
    """Create an optimized HTTP client with recommended configuration.

    Args:
        settings: Optional settings object for additional configuration
        timeout_connect: Connection timeout in seconds
        timeout_read: Read timeout in seconds (long for streaming)
        max_keepalive_connections: Max keep-alive connections for reuse
        max_connections: Max total concurrent connections
        http2: Enable HTTP/2 multiplexing
        verify: SSL verification (True/False or path to CA bundle)
        hook_manager: Optional HookManager for request/response interception
        **kwargs: Additional httpx.AsyncClient arguments

    Returns:
        Configured httpx.AsyncClient instance
    """
    # Get proxy configuration from environment
    proxy = get_proxy_url()

    # Get SSL context configuration
    if isinstance(verify, bool) and verify:
        verify = get_ssl_context()

    # Create timeout configuration
    timeout = httpx.Timeout(
        connect=timeout_connect,
        read=timeout_read,
        write=30.0,  # Write timeout
        pool=30.0,  # Pool timeout
    )

    # Create connection limits
    limits = httpx.Limits(
        max_keepalive_connections=max_keepalive_connections,
        max_connections=max_connections,
    )

    # Create transport
    transport = httpx.AsyncHTTPTransport(
        limits=limits,
        http2=http2,
        verify=verify,
        proxy=proxy,
    )

    # Note: Transport wrapping for logging is now handled by the raw_http_logger plugin

    # Handle compression settings
    default_headers = {}
    if settings and hasattr(settings, "http"):
        http_settings = settings.http
        if not http_settings.compression_enabled:
            # Disable compression by setting identity encoding
            # "identity" means no compression
            default_headers["accept-encoding"] = "identity"
        elif http_settings.accept_encoding:
            # Use custom Accept-Encoding value
            default_headers["accept-encoding"] = http_settings.accept_encoding
        # else: let httpx use its default compression handling
    else:
        logger.warning(
            "http_settings_not_found", settings_present=settings is not None
        )

    # Merge headers with any provided in kwargs
    if "headers" in kwargs:
        default_headers.update(kwargs["headers"])
        kwargs["headers"] = default_headers
    elif default_headers:
        kwargs["headers"] = default_headers

    # Merge with any additional kwargs
    client_config = {
        "timeout": timeout,
        "transport": transport,
        **kwargs,
    }

    # Determine effective compression status
    compression_status = "httpx default"
    if "accept-encoding" in default_headers:
        if default_headers["accept-encoding"] == "identity":
            compression_status = "disabled"
        else:
            compression_status = default_headers["accept-encoding"]

    logger.debug(
        "http_client_created",
        timeout_connect=timeout_connect,
        timeout_read=timeout_read,
        max_keepalive_connections=max_keepalive_connections,
        max_connections=max_connections,
        http2=http2,
        has_proxy=proxy is not None,
        has_hooks=hook_manager is not None,
        compression_enabled=settings.http.compression_enabled
        if settings and hasattr(settings, "http")
        else True,
        accept_encoding=compression_status,
    )

    # Create client with or without hook support
    if hook_manager:
        return HookableHTTPClient(hook_manager=hook_manager, **client_config)
    else:
        return httpx.AsyncClient(**client_config)

create_shared_client staticmethod

create_shared_client(settings=None)

Create an optimized HTTP client.

Prefer managing lifecycle via ServiceContainer + HTTPPoolManager. Kept for compatibility with existing factory call sites.

Source code in ccproxy/http/client.py
@staticmethod
def create_shared_client(settings: Settings | None = None) -> httpx.AsyncClient:
    """Create an optimized HTTP client.

    Prefer managing lifecycle via ServiceContainer + HTTPPoolManager.
    Kept for compatibility with existing factory call sites.
    """
    return HTTPClientFactory.create_client(settings=settings)

create_short_lived_client staticmethod

create_short_lived_client(timeout=15.0, **kwargs)

Create a client for short-lived operations like version checks.

Parameters:

Name Type Description Default
timeout float

Short timeout for quick operations

15.0
**kwargs Any

Additional client configuration

{}

Returns:

Type Description
AsyncClient

Configured httpx.AsyncClient instance for short operations

Source code in ccproxy/http/client.py
@staticmethod
def create_short_lived_client(
    timeout: float = 15.0,
    **kwargs: Any,
) -> httpx.AsyncClient:
    """Create a client for short-lived operations like version checks.

    Args:
        timeout: Short timeout for quick operations
        **kwargs: Additional client configuration

    Returns:
        Configured httpx.AsyncClient instance for short operations
    """
    return HTTPClientFactory.create_client(
        timeout_connect=5.0,
        timeout_read=timeout,
        max_keepalive_connections=10,
        max_connections=50,
        **kwargs,
    )

managed_client async staticmethod

managed_client(settings=None, **kwargs)

Create a managed HTTP client with automatic cleanup.

This context manager ensures proper cleanup of HTTP clients in error cases and provides a clean resource management pattern.

Parameters:

Name Type Description Default
settings Settings | None

Optional settings for configuration

None
**kwargs Any

Additional client configuration

{}

Yields:

Type Description
AsyncGenerator[AsyncClient, None]

Configured httpx.AsyncClient instance

Example

async with HTTPClientFactory.managed_client() as client: response = await client.get("https://api.example.com")

Source code in ccproxy/http/client.py
@staticmethod
@asynccontextmanager
async def managed_client(
    settings: Settings | None = None, **kwargs: Any
) -> AsyncGenerator[httpx.AsyncClient, None]:
    """Create a managed HTTP client with automatic cleanup.

    This context manager ensures proper cleanup of HTTP clients
    in error cases and provides a clean resource management pattern.

    Args:
        settings: Optional settings for configuration
        **kwargs: Additional client configuration

    Yields:
        Configured httpx.AsyncClient instance

    Example:
        async with HTTPClientFactory.managed_client() as client:
            response = await client.get("https://api.example.com")
    """
    client = HTTPClientFactory.create_client(settings=settings, **kwargs)
    try:
        logger.debug("managed_http_client_created")
        yield client
    finally:
        try:
            await client.aclose()
            logger.debug("managed_http_client_closed")
        except Exception as e:
            logger.warning(
                "managed_http_client_close_failed",
                error=str(e),
                exc_info=e,
            )

get_proxy_url

get_proxy_url()

Get proxy URL from environment variables.

Returns:

Type Description
str | None

str or None: Proxy URL if any proxy is set

Source code in ccproxy/http/client.py
def get_proxy_url() -> str | None:
    """Get proxy URL from environment variables.

    Returns:
        str or None: Proxy URL if any proxy is set
    """
    # Check for standard proxy environment variables
    # For HTTPS requests, prioritize HTTPS_PROXY
    https_proxy = os.environ.get("HTTPS_PROXY") or os.environ.get("https_proxy")
    all_proxy = os.environ.get("ALL_PROXY")
    http_proxy = os.environ.get("HTTP_PROXY") or os.environ.get("http_proxy")

    proxy_url = https_proxy or all_proxy or http_proxy

    if proxy_url:
        logger.debug(
            "proxy_configured",
            proxy_url=proxy_url,
            operation="get_proxy_url",
        )

    return proxy_url

get_ssl_context

get_ssl_context()

Get SSL context configuration from environment variables.

Returns:

Type Description
str | bool

SSL verification configuration:

str | bool
  • Path to CA bundle file
str | bool
  • True for default verification
str | bool
  • False to disable verification (insecure)
Source code in ccproxy/http/client.py
def get_ssl_context() -> str | bool:
    """Get SSL context configuration from environment variables.

    Returns:
        SSL verification configuration:
        - Path to CA bundle file
        - True for default verification
        - False to disable verification (insecure)
    """
    # Check for custom CA bundle
    ca_bundle = os.environ.get("REQUESTS_CA_BUNDLE") or os.environ.get("SSL_CERT_FILE")

    # Check if SSL verification should be disabled (NOT RECOMMENDED)
    ssl_verify = os.environ.get("SSL_VERIFY", "true").lower()

    if ca_bundle and Path(ca_bundle).exists():
        logger.debug(
            "ssl_ca_bundle_configured",
            ca_bundle_path=ca_bundle,
            operation="get_ssl_context",
        )
        return ca_bundle
    elif ssl_verify in ("false", "0", "no"):
        logger.warning(
            "ssl_verification_disabled",
            ssl_verify_value=ssl_verify,
            operation="get_ssl_context",
            security_warning=True,
        )
        return False
    else:
        return True