Skip to content

ccproxy.http

ccproxy.http

HTTP package for CCProxy - consolidated HTTP functionality.

BaseHTTPHandler

Bases: ABC

Abstract base class for HTTP handlers with common functionality.

handle_request abstractmethod async

handle_request(
    method, url, headers, body, handler_config, **kwargs
)

Handle an HTTP request.

Parameters:

Name Type Description Default
method str

HTTP method

required
url str

Target URL

required
headers dict[str, str]

Request headers

required
body bytes

Request body

required
handler_config HandlerConfig

Handler configuration

required
**kwargs Any

Additional handler-specific arguments

{}

Returns:

Type Description
Response | StreamingResponse | DeferredStreaming

Response or StreamingResponse

Source code in ccproxy/http/base.py
@abstractmethod
async def handle_request(
    self,
    method: str,
    url: str,
    headers: dict[str, str],
    body: bytes,
    handler_config: HandlerConfig,
    **kwargs: Any,
) -> Response | StreamingResponse | DeferredStreaming:
    """Handle an HTTP request.

    Args:
        method: HTTP method
        url: Target URL
        headers: Request headers
        body: Request body
        handler_config: Handler configuration
        **kwargs: Additional handler-specific arguments

    Returns:
        Response or StreamingResponse
    """
    pass

prepare_request abstractmethod async

prepare_request(request_body, handler_config, **kwargs)

Prepare request for sending.

Parameters:

Name Type Description Default
request_body bytes

Original request body

required
handler_config HandlerConfig

Handler configuration

required
**kwargs Any

Additional preparation parameters

{}

Returns:

Type Description
tuple[bytes, dict[str, str], bool]

Tuple of (transformed_body, headers, is_streaming)

Source code in ccproxy/http/base.py
@abstractmethod
async def prepare_request(
    self,
    request_body: bytes,
    handler_config: HandlerConfig,
    **kwargs: Any,
) -> tuple[bytes, dict[str, str], bool]:
    """Prepare request for sending.

    Args:
        request_body: Original request body
        handler_config: Handler configuration
        **kwargs: Additional preparation parameters

    Returns:
        Tuple of (transformed_body, headers, is_streaming)
    """
    pass

cleanup async

cleanup()

Cleanup handler resources.

Default implementation does nothing. Override in subclasses if cleanup is needed.

Source code in ccproxy/http/base.py
async def cleanup(self) -> None:
    """Cleanup handler resources.

    Default implementation does nothing.
    Override in subclasses if cleanup is needed.
    """
    return None

HTTPClientFactory

Factory for creating optimized HTTP clients.

Provides centralized configuration for HTTP clients with: - Consistent timeout/retry configuration - Unified connection limits - HTTP/2 multiplexing for non-streaming endpoints - Centralized observability hooks (via HookableHTTPClient)

create_client staticmethod

create_client(
    *,
    settings=None,
    timeout_connect=5.0,
    timeout_read=240.0,
    max_keepalive_connections=100,
    max_connections=1000,
    http2=True,
    verify=True,
    hook_manager=None,
    **kwargs,
)

Create an optimized HTTP client with recommended configuration.

Parameters:

Name Type Description Default
settings Settings | None

Optional settings object for additional configuration

None
timeout_connect float

Connection timeout in seconds

5.0
timeout_read float

Read timeout in seconds (long for streaming)

240.0
max_keepalive_connections int

Max keep-alive connections for reuse

100
max_connections int

Max total concurrent connections

1000
http2 bool

Enable HTTP/2 multiplexing

True
verify bool | str

SSL verification (True/False or path to CA bundle)

True
hook_manager Any | None

Optional HookManager for request/response interception

None
**kwargs Any

Additional httpx.AsyncClient arguments

{}

Returns:

Type Description
AsyncClient

Configured httpx.AsyncClient instance

Source code in ccproxy/http/client.py
@staticmethod
def create_client(
    *,
    settings: Settings | None = None,
    timeout_connect: float = 5.0,
    timeout_read: float = 240.0,  # Long timeout for streaming
    max_keepalive_connections: int = 100,  # For non-streaming endpoints
    max_connections: int = 1000,  # High limit for concurrent streams
    http2: bool = True,  # Enable multiplexing (requires httpx[http2])
    verify: bool | str = True,
    hook_manager: Any | None = None,
    **kwargs: Any,
) -> httpx.AsyncClient:
    """Create an optimized HTTP client with recommended configuration.

    Args:
        settings: Optional settings object for additional configuration
        timeout_connect: Connection timeout in seconds
        timeout_read: Read timeout in seconds (long for streaming)
        max_keepalive_connections: Max keep-alive connections for reuse
        max_connections: Max total concurrent connections
        http2: Enable HTTP/2 multiplexing
        verify: SSL verification (True/False or path to CA bundle)
        hook_manager: Optional HookManager for request/response interception
        **kwargs: Additional httpx.AsyncClient arguments

    Returns:
        Configured httpx.AsyncClient instance
    """
    # Get proxy configuration from environment
    proxy = get_proxy_url()

    # Get SSL context configuration
    if isinstance(verify, bool) and verify:
        verify = get_ssl_context()

    # Create timeout configuration
    timeout = httpx.Timeout(
        connect=timeout_connect,
        read=timeout_read,
        write=30.0,  # Write timeout
        pool=30.0,  # Pool timeout
    )

    # Create connection limits
    limits = httpx.Limits(
        max_keepalive_connections=max_keepalive_connections,
        max_connections=max_connections,
    )

    # Create transport
    transport = httpx.AsyncHTTPTransport(
        limits=limits,
        http2=http2,
        verify=verify,
        proxy=proxy,
    )

    # Note: Transport wrapping for logging is now handled by the raw_http_logger plugin

    # Handle compression settings
    default_headers = {}
    if settings and hasattr(settings, "http"):
        http_settings = settings.http
        if not http_settings.compression_enabled:
            # Disable compression by setting identity encoding
            # "identity" means no compression
            default_headers["accept-encoding"] = "identity"
        elif http_settings.accept_encoding:
            # Use custom Accept-Encoding value
            default_headers["accept-encoding"] = http_settings.accept_encoding
        # else: let httpx use its default compression handling
    else:
        logger.warning(
            "http_settings_not_found", settings_present=settings is not None
        )

    # Merge headers with any provided in kwargs
    if "headers" in kwargs:
        default_headers.update(kwargs["headers"])
        kwargs["headers"] = default_headers
    elif default_headers:
        kwargs["headers"] = default_headers

    # Merge with any additional kwargs
    client_config = {
        "timeout": timeout,
        "transport": transport,
        **kwargs,
    }

    # Determine effective compression status
    compression_status = "httpx default"
    if "accept-encoding" in default_headers:
        if default_headers["accept-encoding"] == "identity":
            compression_status = "disabled"
        else:
            compression_status = default_headers["accept-encoding"]

    logger.debug(
        "http_client_created",
        timeout_connect=timeout_connect,
        timeout_read=timeout_read,
        max_keepalive_connections=max_keepalive_connections,
        max_connections=max_connections,
        http2=http2,
        has_proxy=proxy is not None,
        has_hooks=hook_manager is not None,
        compression_enabled=settings.http.compression_enabled
        if settings and hasattr(settings, "http")
        else True,
        accept_encoding=compression_status,
    )

    # Create client with or without hook support
    if hook_manager:
        return HookableHTTPClient(hook_manager=hook_manager, **client_config)
    else:
        return httpx.AsyncClient(**client_config)

create_shared_client staticmethod

create_shared_client(settings=None)

Create an optimized HTTP client.

Prefer managing lifecycle via ServiceContainer + HTTPPoolManager. Kept for compatibility with existing factory call sites.

Source code in ccproxy/http/client.py
@staticmethod
def create_shared_client(settings: Settings | None = None) -> httpx.AsyncClient:
    """Create an optimized HTTP client.

    Prefer managing lifecycle via ServiceContainer + HTTPPoolManager.
    Kept for compatibility with existing factory call sites.
    """
    return HTTPClientFactory.create_client(settings=settings)

create_short_lived_client staticmethod

create_short_lived_client(timeout=15.0, **kwargs)

Create a client for short-lived operations like version checks.

Parameters:

Name Type Description Default
timeout float

Short timeout for quick operations

15.0
**kwargs Any

Additional client configuration

{}

Returns:

Type Description
AsyncClient

Configured httpx.AsyncClient instance for short operations

Source code in ccproxy/http/client.py
@staticmethod
def create_short_lived_client(
    timeout: float = 15.0,
    **kwargs: Any,
) -> httpx.AsyncClient:
    """Create a client for short-lived operations like version checks.

    Args:
        timeout: Short timeout for quick operations
        **kwargs: Additional client configuration

    Returns:
        Configured httpx.AsyncClient instance for short operations
    """
    return HTTPClientFactory.create_client(
        timeout_connect=5.0,
        timeout_read=timeout,
        max_keepalive_connections=10,
        max_connections=50,
        **kwargs,
    )

managed_client async staticmethod

managed_client(settings=None, **kwargs)

Create a managed HTTP client with automatic cleanup.

This context manager ensures proper cleanup of HTTP clients in error cases and provides a clean resource management pattern.

Parameters:

Name Type Description Default
settings Settings | None

Optional settings for configuration

None
**kwargs Any

Additional client configuration

{}

Yields:

Type Description
AsyncGenerator[AsyncClient, None]

Configured httpx.AsyncClient instance

Example

async with HTTPClientFactory.managed_client() as client: response = await client.get("https://api.example.com")

Source code in ccproxy/http/client.py
@staticmethod
@asynccontextmanager
async def managed_client(
    settings: Settings | None = None, **kwargs: Any
) -> AsyncGenerator[httpx.AsyncClient, None]:
    """Create a managed HTTP client with automatic cleanup.

    This context manager ensures proper cleanup of HTTP clients
    in error cases and provides a clean resource management pattern.

    Args:
        settings: Optional settings for configuration
        **kwargs: Additional client configuration

    Yields:
        Configured httpx.AsyncClient instance

    Example:
        async with HTTPClientFactory.managed_client() as client:
            response = await client.get("https://api.example.com")
    """
    client = HTTPClientFactory.create_client(settings=settings, **kwargs)
    try:
        logger.debug("managed_http_client_created")
        yield client
    finally:
        try:
            await client.aclose()
            logger.debug("managed_http_client_closed")
        except Exception as e:
            logger.warning(
                "managed_http_client_close_failed",
                error=str(e),
                exc_info=e,
            )

HTTPConnectionError

HTTPConnectionError(message='Connection failed')

Bases: HTTPError

Exception raised when HTTP connection fails.

Parameters:

Name Type Description Default
message str

Error message

'Connection failed'
Source code in ccproxy/http/client.py
def __init__(self, message: str = "Connection failed") -> None:
    """Initialize connection error.

    Args:
        message: Error message
    """
    super().__init__(message, status_code=503)

HTTPError

HTTPError(message, status_code=None)

Bases: Exception

Base exception for HTTP client errors.

Parameters:

Name Type Description Default
message str

Error message

required
status_code int | None

HTTP status code (optional)

None
Source code in ccproxy/http/client.py
def __init__(self, message: str, status_code: int | None = None) -> None:
    """Initialize HTTP error.

    Args:
        message: Error message
        status_code: HTTP status code (optional)
    """
    super().__init__(message)
    self.status_code = status_code

HTTPTimeoutError

HTTPTimeoutError(message='Request timed out')

Bases: HTTPError

Exception raised when HTTP request times out.

Parameters:

Name Type Description Default
message str

Error message

'Request timed out'
Source code in ccproxy/http/client.py
def __init__(self, message: str = "Request timed out") -> None:
    """Initialize timeout error.

    Args:
        message: Error message
    """
    super().__init__(message, status_code=408)

HookableHTTPClient

HookableHTTPClient(*args, hook_manager=None, **kwargs)

Bases: AsyncClient

HTTP client wrapper that emits hooks for all requests/responses.

Parameters:

Name Type Description Default
*args Any

Arguments for httpx.AsyncClient

()
hook_manager Any | None

Optional HookManager instance for emitting hooks

None
**kwargs Any

Keyword arguments for httpx.AsyncClient

{}
Source code in ccproxy/http/hooks.py
def __init__(self, *args: Any, hook_manager: Any | None = None, **kwargs: Any):
    """Initialize HTTP client with optional hook support.

    Args:
        *args: Arguments for httpx.AsyncClient
        hook_manager: Optional HookManager instance for emitting hooks
        **kwargs: Keyword arguments for httpx.AsyncClient
    """
    super().__init__(*args, **kwargs)
    self.hook_manager = hook_manager

request async

request(
    method,
    url,
    *,
    content=None,
    data=None,
    files=None,
    params=None,
    headers=None,
    json=None,
    **kwargs,
)

Make an HTTP request with hook emissions.

Emits
  • HTTP_REQUEST before sending
  • HTTP_RESPONSE after receiving response
  • HTTP_ERROR on errors
Source code in ccproxy/http/hooks.py
async def request(
    self,
    method: str,
    url: httpx.URL | str,
    *,
    content: RequestContent | None = None,
    data: RequestData | None = None,
    files: RequestFiles | None = None,
    params: QueryParamTypes | None = None,
    headers: HeaderTypes | None = None,
    json: Any | None = None,
    **kwargs: Any,
) -> httpx.Response:
    """Make an HTTP request with hook emissions.

    Emits:
        - HTTP_REQUEST before sending
        - HTTP_RESPONSE after receiving response
        - HTTP_ERROR on errors
    """
    # Build request context for hooks
    request_context: dict[str, Any] = {
        "method": method,
        "url": str(url),
        "headers": dict(self._normalize_header_pairs(headers)),
        "is_provider_request": True,
        "origin": "upstream",
    }

    # Try to get current request ID from RequestContext
    try:
        current_context = RequestContext.get_current()
        if current_context and hasattr(current_context, "request_id"):
            request_context["request_id"] = current_context.request_id
    except Exception:
        # If no request context available, hooks will generate their own ID
        pass

    # Add body information
    if json is not None:
        request_context["body"] = json
        request_context["is_json"] = True
        preview, length, truncated = _stringify_body_for_logging(json)
    elif data is not None:
        request_context["body"] = data
        request_context["is_json"] = False
        preview, length, truncated = _stringify_body_for_logging(data)
    elif content is not None:
        # Handle content parameter - could be bytes, string, or other
        if isinstance(content, bytes | str):
            try:
                if isinstance(content, bytes):
                    content_str = content.decode("utf-8")
                else:
                    content_str = content

                if content_str.strip().startswith(("{", "[")):
                    request_context["body"] = jsonlib.loads(content_str)
                    request_context["is_json"] = True
                else:
                    request_context["body"] = content
                    request_context["is_json"] = False
            except Exception:
                # If parsing fails, just include as-is
                request_context["body"] = content
                request_context["is_json"] = False
        else:
            request_context["body"] = content
            request_context["is_json"] = False
        preview, length, truncated = _stringify_body_for_logging(
            request_context["body"]
        )
    else:
        preview, length, truncated = (None, 0, False)

    start_time = time.perf_counter()

    logger.info(
        "request_started",
        method=method,
        url=str(url),
        request_id=request_context.get("request_id"),
        origin=request_context.get("origin"),
        has_body=preview is not None,
        body_size=length,
        body_truncated=truncated,
        is_json=request_context.get("is_json", False),
        streaming=False,
        category="http",
    )

    logger.debug(
        "upstream_http_request",
        method=method,
        url=str(url),
        request_id=request_context.get("request_id"),
        body_preview=preview,
        body_size=length,
        body_truncated=truncated,
        is_json=request_context.get("is_json", False),
        category="http",
    )

    # Emit pre-request hook
    if self.hook_manager:
        try:
            await self.hook_manager.emit(
                HookEvent.HTTP_REQUEST,
                request_context,
            )
        except Exception as e:
            logger.debug(
                "http_request_hook_error",
                error=str(e),
                method=method,
                url=str(url),
            )

    final_response: httpx.Response | None = None

    try:
        response = await super().request(
            method,
            url,
            content=content,
            data=data,
            files=files,
            json=json,
            params=params,
            headers=headers,
            **kwargs,
        )
        final_response = response

        if self.hook_manager:
            # Read response content FIRST before any other processing
            response_content = response.content

            response_context = {
                **request_context,
                "status_code": response.status_code,
                "response_headers": extract_response_headers(response),
                "is_provider_response": True,
            }

            # Include response body from the content we just read
            try:
                content_type = response.headers.get("content-type", "")
                if "application/json" in content_type:
                    # Try to parse the raw content as JSON
                    try:
                        response_context["response_body"] = jsonlib.loads(
                            response_content.decode("utf-8")
                        )
                    except Exception:
                        # If JSON parsing fails, include as text
                        response_context["response_body"] = response_content.decode(
                            "utf-8", errors="replace"
                        )
                else:
                    # For non-JSON content, include as text
                    response_context["response_body"] = response_content.decode(
                        "utf-8", errors="replace"
                    )
            except Exception:
                # Last resort - include as bytes
                response_context["response_body"] = response_content

            preview, length, truncated = _stringify_body_for_logging(
                response_context.get("response_body")
            )
            logger.debug(
                "upstream_http_response",
                url=str(url),
                request_id=response_context.get("request_id"),
                status_code=response.status_code,
                body_preview=preview,
                body_size=length,
                body_truncated=truncated,
                category="http",
            )

            try:
                await self.hook_manager.emit(
                    HookEvent.HTTP_RESPONSE,
                    response_context,
                )
            except Exception as e:
                logger.debug(
                    "http_response_hook_error",
                    error=str(e),
                    status_code=response.status_code,
                )

            try:
                recreated_response = httpx.Response(
                    status_code=response.status_code,
                    headers=response.headers,
                    content=response_content,
                    request=response.request,
                )
                final_response = recreated_response
            except Exception:
                # If recreation fails, return original (may have empty body)
                logger.debug("response_recreation_failed")
                final_response = response

        duration_ms = round((time.perf_counter() - start_time) * 1000, 3)

        logger.info(
            "request_completed",
            method=method,
            url=str(url),
            request_id=request_context.get("request_id"),
            origin=request_context.get("origin"),
            status_code=final_response.status_code if final_response else None,
            duration_ms=duration_ms,
            streaming=False,
            success=True,
            category="http",
        )

        assert final_response is not None
        return final_response

    except Exception as error:
        duration_ms = round((time.perf_counter() - start_time) * 1000, 3)

        # Emit error hook
        if self.hook_manager:
            error_context = {
                **request_context,
                "error_type": type(error).__name__,
                "error_detail": str(error),
            }

            # Add response info if it's an HTTPStatusError
            if isinstance(error, httpx.HTTPStatusError):
                error_context["status_code"] = error.response.status_code
                error_context["response_body"] = error.response.text

            try:
                await self.hook_manager.emit(
                    HookEvent.HTTP_ERROR,
                    error_context,
                )
            except Exception as e:
                logger.debug(
                    "http_error_hook_error",
                    error=str(e),
                    original_error=str(error),
                )

        status_code = getattr(getattr(error, "response", None), "status_code", None)

        logger.info(
            "request_completed",
            method=method,
            url=str(url),
            request_id=request_context.get("request_id"),
            origin=request_context.get("origin"),
            status_code=status_code,
            duration_ms=duration_ms,
            streaming=False,
            success=False,
            error_type=type(error).__name__,
            category="http",
        )

        logger.error(
            "upstream_http_error",
            url=str(url),
            request_id=request_context.get("request_id"),
            error_type=type(error).__name__,
            error_detail=str(error),
            category="http",
        )

        # Re-raise the original error
        raise

stream async

stream(
    method,
    url,
    *,
    content=None,
    data=None,
    files=None,
    params=None,
    headers=None,
    json=None,
    **kwargs,
)

Make a streaming HTTP request with hook emissions.

This method emits HTTP hooks for streaming requests, capturing the complete response body while maintaining streaming behavior.

Emits
  • HTTP_REQUEST before sending
  • HTTP_RESPONSE after receiving complete response
  • HTTP_ERROR on errors
Source code in ccproxy/http/hooks.py
@contextlib.asynccontextmanager
async def stream(
    self,
    method: str,
    url: httpx.URL | str,
    *,
    content: RequestContent | None = None,
    data: RequestData | None = None,
    files: RequestFiles | None = None,
    params: QueryParamTypes | None = None,
    headers: HeaderTypes | None = None,
    json: Any | None = None,
    **kwargs: Any,
) -> AsyncIterator[httpx.Response]:
    """Make a streaming HTTP request with hook emissions.

    This method emits HTTP hooks for streaming requests, capturing the complete
    response body while maintaining streaming behavior.

    Emits:
        - HTTP_REQUEST before sending
        - HTTP_RESPONSE after receiving complete response
        - HTTP_ERROR on errors
    """
    # Build request context for hooks (same as request() method)
    request_context: dict[str, Any] = {
        "method": method,
        "url": str(url),
        "headers": dict(self._normalize_header_pairs(headers)),
        "is_provider_request": True,
        "origin": "upstream",
    }

    # Try to get current request ID from RequestContext
    try:
        current_context = RequestContext.get_current()
        if current_context and hasattr(current_context, "request_id"):
            request_context["request_id"] = current_context.request_id
    except Exception:
        # No current context available, that's OK
        pass

    # Add request body to context if available
    if json is not None:
        request_context["body"] = json
        request_context["is_json"] = True
    elif data is not None:
        request_context["body"] = data
        request_context["is_json"] = False
    elif content is not None:
        request_context["body"] = content
        request_context["is_json"] = False

    preview, length, truncated = _stringify_body_for_logging(
        request_context.get("body")
    )
    start_time = time.perf_counter()

    logger.info(
        "sse_connection_started",
        method=method,
        url=str(url),
        request_id=request_context.get("request_id"),
        origin=request_context.get("origin"),
        has_body=preview is not None,
        body_size=length,
        body_truncated=truncated,
        is_json=request_context.get("is_json", False),
        streaming=True,
        category="http",
    )

    logger.debug(
        "upstream_http_request",
        method=method,
        url=str(url),
        request_id=request_context.get("request_id"),
        body_preview=preview,
        body_size=length,
        body_truncated=truncated,
        is_json=request_context.get("is_json", False),
        streaming=True,
        category="http",
    )

    # Emit pre-request hook
    if self.hook_manager:
        try:
            await self.hook_manager.emit(
                HookEvent.HTTP_REQUEST,
                request_context,
            )
        except Exception as e:
            logger.debug(
                "http_request_hook_error",
                error=str(e),
                method=method,
                url=str(url),
            )

    request_error: BaseException | None = None
    status_code: int | None = None
    connection_started = False
    closed_by: str = "server"

    try:
        async with super().stream(
            method=method,
            url=url,
            content=content,
            data=data,
            files=files,
            params=params,
            headers=headers,
            json=json,
            **kwargs,
        ) as response:
            status_code = response.status_code
            connection_started = True

            if self.hook_manager:
                try:
                    response_context = {
                        **request_context,
                        "status_code": response.status_code,
                        "response_headers": extract_response_headers(response),
                        "is_provider_response": True,
                        "streaming": True,
                    }
                    await self.hook_manager.emit(
                        HookEvent.HTTP_RESPONSE,
                        response_context,
                    )
                except Exception as e:
                    logger.debug(
                        "http_response_hook_error",
                        error=str(e),
                        status_code=response.status_code,
                    )

            logger.debug(
                "upstream_http_response",
                url=str(url),
                request_id=request_context.get("request_id"),
                status_code=response.status_code,
                streaming=True,
                body_preview=None,
                body_size=0,
                body_truncated=False,
                category="http",
            )
            yield response

    except asyncio.CancelledError as error:
        request_error = error
        closed_by = "client"

        # Emit error hook
        if self.hook_manager:
            error_context = {
                **request_context,
                "error": error,
                "error_type": type(error).__name__,
            }

            try:
                await self.hook_manager.emit(
                    HookEvent.HTTP_ERROR,
                    error_context,
                )
            except Exception as e:
                logger.debug(
                    "http_error_hook_error",
                    error=str(e),
                    original_error=str(error),
                )

        logger.debug(
            "upstream_http_cancelled",
            url=str(url),
            request_id=request_context.get("request_id"),
            error_type=type(error).__name__,
            streaming=True,
            category="http",
        )

        raise

    except Exception as error:
        request_error = error
        closed_by = "server_error"

        # Emit error hook
        if self.hook_manager:
            error_context = {
                **request_context,
                "error": error,
                "error_type": type(error).__name__,
            }

            if isinstance(error, httpx.HTTPStatusError):
                error_context["status_code"] = error.response.status_code
                error_context["response_body"] = error.response.text

            try:
                await self.hook_manager.emit(
                    HookEvent.HTTP_ERROR,
                    error_context,
                )
            except Exception as e:
                logger.debug(
                    "http_error_hook_error",
                    error=str(e),
                    original_error=str(error),
                )

        if not isinstance(error, asyncio.CancelledError):
            logger.error(
                "upstream_http_error",
                url=str(url),
                request_id=request_context.get("request_id"),
                error_type=type(error).__name__,
                error_detail=str(error),
                streaming=True,
                category="http",
            )

        raise

    finally:
        duration_ms = round((time.perf_counter() - start_time) * 1000, 3)
        log_fields: dict[str, Any] = {
            "method": method,
            "url": str(url),
            "request_id": request_context.get("request_id"),
            "origin": request_context.get("origin"),
            "status_code": status_code,
            "duration_ms": duration_ms,
            "streaming": True,
            "success": request_error is None and connection_started,
            "closed_by": closed_by if request_error is not None else "server",
            "category": "http",
        }
        stream_metadata = getattr(request_context, "metadata", {})
        closed_override = (
            stream_metadata.get("stream_closed_by") if stream_metadata else None
        )
        if closed_override:
            log_fields["closed_by"] = closed_override
            if closed_override != "server":
                log_fields["success"] = False
        error_reason_override = (
            stream_metadata.get("stream_error_reason") if stream_metadata else None
        )
        error_type_override = (
            stream_metadata.get("stream_error_type") if stream_metadata else None
        )

        if request_error is not None:
            log_fields["error_type"] = type(request_error).__name__
            error_str = str(request_error)
            if error_str:
                log_fields["error_reason"] = error_str
        elif error_type_override:
            log_fields["error_type"] = error_type_override
            if error_reason_override:
                log_fields["error_reason"] = error_reason_override

        if error_reason_override and "error_reason" not in log_fields:
            log_fields["error_reason"] = error_reason_override

        logger.info("sse_connection_ended", **log_fields)

HTTPPoolManager

HTTPPoolManager(settings=None, hook_manager=None)

Manages HTTP connection pools for different base URLs.

This manager ensures that: - Each unique base URL gets its own optimized connection pool - Connection pools are reused across all components - Resources are properly cleaned up on shutdown - Configuration is consistent across all clients

Parameters:

Name Type Description Default
settings Settings | None

Optional application settings for configuration

None
hook_manager Any | None

Optional hook manager for request/response tracing

None
Source code in ccproxy/http/pool.py
def __init__(
    self, settings: Settings | None = None, hook_manager: Any | None = None
) -> None:
    """Initialize the HTTP pool manager.

    Args:
        settings: Optional application settings for configuration
        hook_manager: Optional hook manager for request/response tracing
    """
    self.settings = settings
    self.hook_manager = hook_manager
    self._pools: dict[str, httpx.AsyncClient] = {}
    self._shared_client: httpx.AsyncClient | None = None
    self._lock = asyncio.Lock()

    logger.trace("http_pool_manager_initialized", category="lifecycle")

get_client async

get_client(
    base_url=None, *, timeout=None, headers=None, **kwargs
)

Get or create an HTTP client for the specified base URL.

Parameters:

Name Type Description Default
base_url str | None

Optional base URL for the client. If None, returns the default client

None
timeout float | None

Optional custom timeout for this client

None
headers dict[str, str] | None

Optional default headers for this client

None
**kwargs Any

Additional configuration for the client

{}

Returns:

Type Description
AsyncClient

Configured httpx.AsyncClient instance

Source code in ccproxy/http/pool.py
async def get_client(
    self,
    base_url: str | None = None,
    *,
    timeout: float | None = None,
    headers: dict[str, str] | None = None,
    **kwargs: Any,
) -> httpx.AsyncClient:
    """Get or create an HTTP client for the specified base URL.

    Args:
        base_url: Optional base URL for the client. If None, returns the default client
        timeout: Optional custom timeout for this client
        headers: Optional default headers for this client
        **kwargs: Additional configuration for the client

    Returns:
        Configured httpx.AsyncClient instance
    """
    # If no base URL, return the shared general-purpose client
    if not base_url:
        return await self.get_shared_client()

    # Normalize the base URL to use as a key
    pool_key = self._normalize_base_url(base_url)

    async with self._lock:
        # Check if we already have a client for this base URL
        if pool_key in self._pools:
            logger.trace(
                "reusing_existing_pool",
                base_url=base_url,
                pool_key=pool_key,
                category="lifecycle",
            )
            return self._pools[pool_key]

        # Create a new client for this base URL
        logger.trace(
            "creating_new_pool",
            base_url=base_url,
            pool_key=pool_key,
        )

        # Build client configuration
        client_config: dict[str, Any] = {
            "base_url": base_url,
        }

        if headers:
            client_config["headers"] = headers

        if timeout is not None:
            client_config["timeout_read"] = timeout

        # Merge with any additional kwargs
        client_config.update(kwargs)

        # Create the client using the factory with HTTP/2 enabled for better multiplexing
        client = HTTPClientFactory.create_client(
            settings=self.settings,
            hook_manager=self.hook_manager,
            http2=False,  # Enable HTTP/2 for connection multiplexing
            **client_config,
        )

        # Store in the pool
        self._pools[pool_key] = client

        return client

get_shared_client async

get_shared_client()

Get the default general-purpose HTTP client.

This client is used for requests without a specific base URL and is managed by this pool manager for reuse during the app lifetime.

Returns:

Type Description
AsyncClient

The default httpx.AsyncClient instance

Source code in ccproxy/http/pool.py
async def get_shared_client(self) -> httpx.AsyncClient:
    """Get the default general-purpose HTTP client.

    This client is used for requests without a specific base URL and is managed
    by this pool manager for reuse during the app lifetime.

    Returns:
        The default httpx.AsyncClient instance
    """
    async with self._lock:
        if self._shared_client is None:
            logger.trace("default_client_created")
            self._shared_client = HTTPClientFactory.create_client(
                settings=self.settings,
                hook_manager=self.hook_manager,
                http2=False,  # Enable HTTP/1 for default client
            )
        return self._shared_client

get_streaming_client async

get_streaming_client(
    base_url=None, *, headers=None, **kwargs
)

Get or create a client optimized for streaming.

Uses a longer read timeout appropriate for SSE/streaming endpoints.

Parameters:

Name Type Description Default
base_url str | None

Optional base URL for the client

None
headers dict[str, str] | None

Optional default headers

None
**kwargs Any

Additional client kwargs merged into configuration

{}

Returns:

Type Description
AsyncClient

Configured httpx.AsyncClient instance

Source code in ccproxy/http/pool.py
async def get_streaming_client(
    self,
    base_url: str | None = None,
    *,
    headers: dict[str, str] | None = None,
    **kwargs: Any,
) -> httpx.AsyncClient:
    """Get or create a client optimized for streaming.

    Uses a longer read timeout appropriate for SSE/streaming endpoints.

    Args:
        base_url: Optional base URL for the client
        headers: Optional default headers
        **kwargs: Additional client kwargs merged into configuration

    Returns:
        Configured httpx.AsyncClient instance
    """
    return await self.get_client(
        base_url=base_url,
        timeout=HTTP_STREAMING_TIMEOUT,
        headers=headers,
        **kwargs,
    )

get_shared_client_sync

get_shared_client_sync()

Get or create the default client synchronously.

This is used during initialization when we're not in an async context. Note: This doesn't use locking, so it should only be called during single-threaded initialization.

Returns:

Type Description
AsyncClient

The default httpx.AsyncClient instance

Source code in ccproxy/http/pool.py
def get_shared_client_sync(self) -> httpx.AsyncClient:
    """Get or create the default client synchronously.

    This is used during initialization when we're not in an async context.
    Note: This doesn't use locking, so it should only be called during
    single-threaded initialization.

    Returns:
        The default httpx.AsyncClient instance
    """
    if self._shared_client is None:
        logger.trace("default_client_created_sync")
        self._shared_client = HTTPClientFactory.create_client(
            settings=self.settings,
            hook_manager=self.hook_manager,
            http2=False,  # Disable HTTP/2 to ensure logging transport works
        )
    return self._shared_client

get_pool_client

get_pool_client(base_url)

Get an existing client for a base URL without creating one.

Parameters:

Name Type Description Default
base_url str

The base URL to look up

required

Returns:

Type Description
AsyncClient | None

Existing client or None if not found

Source code in ccproxy/http/pool.py
def get_pool_client(self, base_url: str) -> httpx.AsyncClient | None:
    """Get an existing client for a base URL without creating one.

    Args:
        base_url: The base URL to look up

    Returns:
        Existing client or None if not found
    """
    pool_key = self._normalize_base_url(base_url)
    return self._pools.get(pool_key)

close_pool async

close_pool(base_url)

Close and remove a specific connection pool.

Parameters:

Name Type Description Default
base_url str

The base URL of the pool to close

required
Source code in ccproxy/http/pool.py
async def close_pool(self, base_url: str) -> None:
    """Close and remove a specific connection pool.

    Args:
        base_url: The base URL of the pool to close
    """
    pool_key = self._normalize_base_url(base_url)

    async with self._lock:
        if pool_key in self._pools:
            client = self._pools.pop(pool_key)
            await client.aclose()
            logger.trace(
                "pool_closed",
                base_url=base_url,
                pool_key=pool_key,
            )

close_all async

close_all()

Close all connection pools and clean up resources.

This should be called during application shutdown.

Source code in ccproxy/http/pool.py
async def close_all(self) -> None:
    """Close all connection pools and clean up resources.

    This should be called during application shutdown.
    """
    async with self._lock:
        # Close all URL-specific pools
        for pool_key, client in self._pools.items():
            try:
                await client.aclose()
                logger.trace("pool_closed", pool_key=pool_key)
            except Exception as e:
                logger.error(
                    "pool_close_error",
                    pool_key=pool_key,
                    error=str(e),
                    exc_info=e,
                )

        self._pools.clear()

        # Close the default client
        if self._shared_client:
            try:
                await self._shared_client.aclose()
                logger.trace("default_client_closed")
            except Exception as e:
                logger.error(
                    "default_client_close_error",
                    error=str(e),
                    exc_info=e,
                )
            self._shared_client = None

        logger.trace("all_pools_closed")

get_pool_stats

get_pool_stats()

Get statistics about the current connection pools.

Returns:

Type Description
dict[str, Any]

Dictionary with pool statistics

Source code in ccproxy/http/pool.py
def get_pool_stats(self) -> dict[str, Any]:
    """Get statistics about the current connection pools.

    Returns:
        Dictionary with pool statistics
    """
    return {
        "total_pools": len(self._pools),
        "pool_keys": list(self._pools.keys()),
        "has_default_client": self._shared_client is not None,
    }

get_proxy_url

get_proxy_url()

Get proxy URL from environment variables.

Returns:

Type Description
str | None

str or None: Proxy URL if any proxy is set

Source code in ccproxy/http/client.py
def get_proxy_url() -> str | None:
    """Get proxy URL from environment variables.

    Returns:
        str or None: Proxy URL if any proxy is set
    """
    # Check for standard proxy environment variables
    # For HTTPS requests, prioritize HTTPS_PROXY
    https_proxy = os.environ.get("HTTPS_PROXY") or os.environ.get("https_proxy")
    all_proxy = os.environ.get("ALL_PROXY")
    http_proxy = os.environ.get("HTTP_PROXY") or os.environ.get("http_proxy")

    proxy_url = https_proxy or all_proxy or http_proxy

    if proxy_url:
        logger.debug(
            "proxy_configured",
            proxy_url=proxy_url,
            operation="get_proxy_url",
        )

    return proxy_url

get_ssl_context

get_ssl_context()

Get SSL context configuration from environment variables.

Returns:

Type Description
str | bool

SSL verification configuration:

str | bool
  • Path to CA bundle file
str | bool
  • True for default verification
str | bool
  • False to disable verification (insecure)
Source code in ccproxy/http/client.py
def get_ssl_context() -> str | bool:
    """Get SSL context configuration from environment variables.

    Returns:
        SSL verification configuration:
        - Path to CA bundle file
        - True for default verification
        - False to disable verification (insecure)
    """
    # Check for custom CA bundle
    ca_bundle = os.environ.get("REQUESTS_CA_BUNDLE") or os.environ.get("SSL_CERT_FILE")

    # Check if SSL verification should be disabled (NOT RECOMMENDED)
    ssl_verify = os.environ.get("SSL_VERIFY", "true").lower()

    if ca_bundle and Path(ca_bundle).exists():
        logger.debug(
            "ssl_ca_bundle_configured",
            ca_bundle_path=ca_bundle,
            operation="get_ssl_context",
        )
        return ca_bundle
    elif ssl_verify in ("false", "0", "no"):
        logger.warning(
            "ssl_verification_disabled",
            ssl_verify_value=ssl_verify,
            operation="get_ssl_context",
            security_warning=True,
        )
        return False
    else:
        return True