Skip to content

ccproxy.docker

ccproxy.docker

Docker integration module for Claude Code Proxy.

This module provides a comprehensive Docker integration system with support for: - Protocol-based adapter design for better testing and flexibility - Enhanced error handling with contextual information - Real-time output streaming with middleware support - Comprehensive port publishing (including host interface binding) - Unified path management using DockerPath - User context management with proper UID/GID mapping

DockerAdapter

Implementation of Docker adapter.

is_available async

is_available()

Check if Docker is available on the system.

Source code in ccproxy/docker/adapter.py
async def is_available(self) -> bool:
    """Check if Docker is available on the system."""
    docker_cmd = ["docker", "--version"]
    cmd_str = " ".join(docker_cmd)

    try:
        process = await asyncio.create_subprocess_exec(
            *docker_cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        stdout, stderr = await process.communicate()

        if process.returncode == 0:
            docker_version = stdout.decode().strip()
            logger.debug("docker_available", version=docker_version)
            return True
        else:
            stderr_text = stderr.decode() if stderr else "unknown error"
            logger.warning(
                "docker_command_failed", command=cmd_str, error=stderr_text
            )
            return False

    except FileNotFoundError:
        logger.warning("docker_executable_not_found")
        return False

    except Exception as e:
        logger.warning("docker_availability_check_error", error=str(e))
        return False

run_container async

run_container(
    image,
    volumes,
    environment,
    command=None,
    middleware=None,
    user_context=None,
    entrypoint=None,
    ports=None,
)

Run a Docker container with specified configuration.

Source code in ccproxy/docker/adapter.py
async def run_container(
    self,
    image: str,
    volumes: list[DockerVolume],
    environment: DockerEnv,
    command: list[str] | None = None,
    middleware: OutputMiddleware[T] | None = None,
    user_context: DockerUserContext | None = None,
    entrypoint: str | None = None,
    ports: list[DockerPortSpec] | None = None,
) -> ProcessResult[T]:
    """Run a Docker container with specified configuration."""

    docker_cmd = ["docker", "run", "--rm"]

    # Add user context if provided and should be used
    if user_context and user_context.should_use_user_mapping():
        docker_user_flag = user_context.get_docker_user_flag()
        docker_cmd.extend(["--user", docker_user_flag])
        logger.debug("docker_user_mapping", user_flag=docker_user_flag)

    # Add custom entrypoint if specified
    if entrypoint:
        docker_cmd.extend(["--entrypoint", entrypoint])
        logger.debug("docker_custom_entrypoint", entrypoint=entrypoint)

    # Add port publishing if specified
    if ports:
        for port_spec in ports:
            validated_port = validate_port_spec(port_spec)
            docker_cmd.extend(["-p", validated_port])
            logger.debug("docker_port_mapping", port=validated_port)

    # Add volume mounts
    for host_path, container_path in volumes:
        docker_cmd.extend(["-v", f"{host_path}:{container_path}"])

    # Add environment variables
    for key, value in environment.items():
        docker_cmd.extend(["-e", f"{key}={value}"])

    # Add image
    docker_cmd.append(image)

    # Add command if specified
    if command:
        docker_cmd.extend(command)

    cmd_str = " ".join(shlex.quote(arg) for arg in docker_cmd)
    logger.debug("docker_command", command=cmd_str)

    try:
        if middleware is None:
            # Cast is needed because T is unbound at this point
            middleware = cast(OutputMiddleware[T], LoggerOutputMiddleware(logger))

        # Try with sudo fallback if needed
        result = await self._run_with_sudo_fallback(docker_cmd, middleware)

        return result

    except FileNotFoundError as e:
        error = create_docker_error(f"Docker executable not found: {e}", cmd_str, e)
        logger.error("docker_executable_not_found", error=str(e))
        raise error from e

    except Exception as e:
        error = create_docker_error(
            f"Failed to run Docker container: {e}",
            cmd_str,
            e,
            {
                "image": image,
                "volumes_count": len(volumes),
                "env_vars_count": len(environment),
            },
        )
        logger.error("docker_container_run_error", error=str(e))
        raise error from e

run async

run(
    image,
    volumes,
    environment,
    command=None,
    middleware=None,
    user_context=None,
    entrypoint=None,
    ports=None,
)

Run a Docker container with specified configuration.

This is an alias for run_container method.

Source code in ccproxy/docker/adapter.py
async def run(
    self,
    image: str,
    volumes: list[DockerVolume],
    environment: DockerEnv,
    command: list[str] | None = None,
    middleware: OutputMiddleware[T] | None = None,
    user_context: DockerUserContext | None = None,
    entrypoint: str | None = None,
    ports: list[DockerPortSpec] | None = None,
) -> ProcessResult[T]:
    """Run a Docker container with specified configuration.

    This is an alias for run_container method.
    """
    return await self.run_container(
        image=image,
        volumes=volumes,
        environment=environment,
        command=command,
        middleware=middleware,
        user_context=user_context,
        entrypoint=entrypoint,
        ports=ports,
    )

exec_container

exec_container(
    image,
    volumes,
    environment,
    command=None,
    user_context=None,
    entrypoint=None,
    ports=None,
)

Execute a Docker container by replacing the current process.

This method builds the Docker command and replaces the current process with the Docker command using os.execvp, effectively handing over control to Docker.

Parameters:

Name Type Description Default
image str

Docker image name/tag to run

required
volumes list[DockerVolume]

List of volume mounts (host_path, container_path)

required
environment DockerEnv

Dictionary of environment variables

required
command list[str] | None

Optional command to run in the container

None
user_context DockerUserContext | None

Optional user context for Docker --user flag

None
entrypoint str | None

Optional custom entrypoint to override the image's default

None
ports list[DockerPortSpec] | None

Optional port specifications (e.g., ["8080:80", "localhost:9000:9000"])

None

Raises:

Type Description
DockerError

If the container fails to execute

OSError

If the command cannot be executed

Source code in ccproxy/docker/adapter.py
def exec_container(
    self,
    image: str,
    volumes: list[DockerVolume],
    environment: DockerEnv,
    command: list[str] | None = None,
    user_context: DockerUserContext | None = None,
    entrypoint: str | None = None,
    ports: list[DockerPortSpec] | None = None,
) -> None:
    """Execute a Docker container by replacing the current process.

    This method builds the Docker command and replaces the current process
    with the Docker command using os.execvp, effectively handing over control to Docker.

    Args:
        image: Docker image name/tag to run
        volumes: List of volume mounts (host_path, container_path)
        environment: Dictionary of environment variables
        command: Optional command to run in the container
        user_context: Optional user context for Docker --user flag
        entrypoint: Optional custom entrypoint to override the image's default
        ports: Optional port specifications (e.g., ["8080:80", "localhost:9000:9000"])

    Raises:
        DockerError: If the container fails to execute
        OSError: If the command cannot be executed
    """
    docker_cmd = ["docker", "run", "--rm", "-it"]

    # Add user context if provided and should be used
    if user_context and user_context.should_use_user_mapping():
        docker_user_flag = user_context.get_docker_user_flag()
        docker_cmd.extend(["--user", docker_user_flag])
        logger.debug("docker_user_mapping", user_flag=docker_user_flag)

    # Add custom entrypoint if specified
    if entrypoint:
        docker_cmd.extend(["--entrypoint", entrypoint])
        logger.debug("docker_custom_entrypoint", entrypoint=entrypoint)

    # Add port publishing if specified
    if ports:
        for port_spec in ports:
            validated_port = validate_port_spec(port_spec)
            docker_cmd.extend(["-p", validated_port])
            logger.debug("docker_port_mapping", port=validated_port)

    # Add volume mounts
    for host_path, container_path in volumes:
        docker_cmd.extend(["-v", f"{host_path}:{container_path}"])

    # Add environment variables
    for key, value in environment.items():
        docker_cmd.extend(["-e", f"{key}={value}"])

    # Add image
    docker_cmd.append(image)

    # Add command if specified
    if command:
        docker_cmd.extend(command)

    cmd_str = " ".join(shlex.quote(arg) for arg in docker_cmd)
    logger.info("docker_execvp", command=cmd_str)

    try:
        # Check if we need sudo (without running the actual command)
        # Note: We can't use await here since this method replaces the process
        # Use a simple check instead
        try:
            import subprocess

            subprocess.run(
                ["docker", "info"], check=True, capture_output=True, text=True
            )
            needs_sudo = False
        except subprocess.CalledProcessError as e:
            needs_sudo = e.stderr and (
                "permission denied" in e.stderr.lower()
                or "dial unix" in e.stderr.lower()
                or "connect: permission denied" in e.stderr.lower()
            )
        except Exception:
            needs_sudo = False

        if needs_sudo:
            logger.info("docker_using_sudo_for_execution")
            docker_cmd = ["sudo"] + docker_cmd

        # Replace current process with Docker command
        os.execvp(docker_cmd[0], docker_cmd)

    except FileNotFoundError as e:
        error = create_docker_error(f"Docker executable not found: {e}", cmd_str, e)
        logger.error("docker_execvp_executable_not_found", error=str(e))
        raise error from e

    except OSError as e:
        error = create_docker_error(
            f"Failed to execute Docker command: {e}", cmd_str, e
        )
        logger.error("docker_execvp_os_error", error=str(e))
        raise error from e

    except Exception as e:
        error = create_docker_error(
            f"Unexpected error executing Docker container: {e}",
            cmd_str,
            e,
            {
                "image": image,
                "volumes_count": len(volumes),
                "env_vars_count": len(environment),
            },
        )
        logger.error("docker_execvp_unexpected_error", error=str(e))
        raise error from e

build_image async

build_image(
    dockerfile_dir,
    image_name,
    image_tag="latest",
    no_cache=False,
    middleware=None,
)

Build a Docker image from a Dockerfile.

Source code in ccproxy/docker/adapter.py
async def build_image(
    self,
    dockerfile_dir: Path,
    image_name: str,
    image_tag: str = "latest",
    no_cache: bool = False,
    middleware: OutputMiddleware[T] | None = None,
) -> ProcessResult[T]:
    """Build a Docker image from a Dockerfile."""

    image_full_name = f"{image_name}:{image_tag}"

    # Check Docker availability
    if not await self.is_available():
        error = create_docker_error(
            "Docker is not available or not properly installed",
            None,
            None,
            {"image": image_full_name},
        )
        logger.error("docker_not_available_for_build", image=image_full_name)
        raise error

    # Validate dockerfile directory
    dockerfile_dir = Path(dockerfile_dir).resolve()
    if not dockerfile_dir.exists() or not dockerfile_dir.is_dir():
        error = create_docker_error(
            f"Dockerfile directory not found: {dockerfile_dir}",
            None,
            None,
            {"dockerfile_dir": str(dockerfile_dir), "image": image_full_name},
        )
        logger.error(
            "dockerfile_directory_invalid", dockerfile_dir=str(dockerfile_dir)
        )
        raise error

    # Check for Dockerfile
    dockerfile_path = dockerfile_dir / "Dockerfile"
    if not dockerfile_path.exists():
        error = create_docker_error(
            f"Dockerfile not found: {dockerfile_path}",
            None,
            None,
            {"dockerfile_path": str(dockerfile_path), "image": image_full_name},
        )
        logger.error("dockerfile_not_found", dockerfile_path=str(dockerfile_path))
        raise error

    # Build the Docker command
    docker_cmd = [
        "docker",
        "build",
        "-t",
        image_full_name,
    ]

    if no_cache:
        docker_cmd.append("--no-cache")

    docker_cmd.append(str(dockerfile_dir))

    # Format command for logging
    cmd_str = " ".join(shlex.quote(arg) for arg in docker_cmd)
    logger.info("docker_build_starting", image=image_full_name)
    logger.debug("docker_command", command=cmd_str)

    try:
        if middleware is None:
            # Cast is needed because T is unbound at this point
            middleware = cast(OutputMiddleware[T], LoggerOutputMiddleware(logger))

        result = await self._run_with_sudo_fallback(docker_cmd, middleware)

        return result

    except FileNotFoundError as e:
        error = create_docker_error(f"Docker executable not found: {e}", cmd_str, e)
        logger.error("docker_build_executable_not_found", error=str(e))
        raise error from e

    except Exception as e:
        error = create_docker_error(
            f"Unexpected error building Docker image: {e}",
            cmd_str,
            e,
            {"image": image_full_name, "dockerfile_dir": str(dockerfile_dir)},
        )

        logger.error(
            "docker_build_unexpected_error", image=image_full_name, error=str(e)
        )
        raise error from e

image_exists async

image_exists(image_name, image_tag='latest')

Check if a Docker image exists locally.

Source code in ccproxy/docker/adapter.py
async def image_exists(self, image_name: str, image_tag: str = "latest") -> bool:
    """Check if a Docker image exists locally."""
    image_full_name = f"{image_name}:{image_tag}"

    # Check Docker availability
    if not await self.is_available():
        logger.warning(
            "docker_not_available_for_image_check", image=image_full_name
        )
        return False

    # Build the Docker command to check image existence
    docker_cmd = ["docker", "inspect", image_full_name]
    cmd_str = " ".join(shlex.quote(arg) for arg in docker_cmd)

    try:
        # Run Docker inspect command
        process = await asyncio.create_subprocess_exec(
            *docker_cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        _, stderr = await process.communicate()

        if process.returncode == 0:
            logger.debug("docker_image_exists", image=image_full_name)
            return True

        # Check if this is a permission error, try with sudo
        stderr_text = stderr.decode() if stderr else ""
        if any(
            phrase in stderr_text.lower()
            for phrase in [
                "permission denied",
                "dial unix",
                "connect: permission denied",
            ]
        ):
            try:
                logger.debug("docker_image_check_permission_denied_using_sudo")
                sudo_cmd = ["sudo"] + docker_cmd
                sudo_process = await asyncio.create_subprocess_exec(
                    *sudo_cmd,
                    stdout=asyncio.subprocess.PIPE,
                    stderr=asyncio.subprocess.PIPE,
                )
                await sudo_process.communicate()
                if sudo_process.returncode == 0:
                    logger.debug(
                        "docker_image_exists_with_sudo", image=image_full_name
                    )
                    return True
                else:
                    # Image doesn't exist even with sudo
                    logger.debug(
                        "docker_image_does_not_exist", image=image_full_name
                    )
                    return False
            except Exception:
                # Image doesn't exist even with sudo
                logger.debug("Docker image does not exist: %s", image_full_name)
                return False
        else:
            # Image doesn't exist (inspect returns non-zero exit code)
            logger.debug("Docker image does not exist: %s", image_full_name)
            return False

    except FileNotFoundError:
        logger.warning("docker_image_check_executable_not_found")
        return False

    except Exception as e:
        logger.warning("docker_image_check_unexpected_error", error=str(e))
        return False

pull_image async

pull_image(image_name, image_tag='latest', middleware=None)

Pull a Docker image from registry.

Source code in ccproxy/docker/adapter.py
async def pull_image(
    self,
    image_name: str,
    image_tag: str = "latest",
    middleware: OutputMiddleware[T] | None = None,
) -> ProcessResult[T]:
    """Pull a Docker image from registry."""

    image_full_name = f"{image_name}:{image_tag}"

    # Check Docker availability
    if not await self.is_available():
        error = create_docker_error(
            "Docker is not available or not properly installed",
            None,
            None,
            {"image": image_full_name},
        )
        logger.error("docker_not_available_for_pull", image=image_full_name)
        raise error

    # Build the Docker command
    docker_cmd = ["docker", "pull", image_full_name]

    # Format command for logging
    cmd_str = " ".join(shlex.quote(arg) for arg in docker_cmd)
    logger.info("docker_pull_starting", image=image_full_name)
    logger.debug("docker_command", command=cmd_str)

    try:
        if middleware is None:
            # Cast is needed because T is unbound at this point
            middleware = cast(OutputMiddleware[T], LoggerOutputMiddleware(logger))

        result = await self._run_with_sudo_fallback(docker_cmd, middleware)

        return result

    except FileNotFoundError as e:
        error = create_docker_error(f"Docker executable not found: {e}", cmd_str, e)
        logger.error("docker_pull_executable_not_found", error=str(e))
        raise error from e

    except Exception as e:
        error = create_docker_error(
            f"Unexpected error pulling Docker image: {e}",
            cmd_str,
            e,
            {"image": image_full_name},
        )

        logger.error(
            "docker_pull_unexpected_error", image=image_full_name, error=str(e)
        )
        raise error from e

DockerPath

Bases: BaseModel

Represents a mapping between host and container paths.

Provides a clean API for Docker volume mounting and path resolution.

Example

workspace = DockerPath(host_path="/some/host/local/path", container_path="/tmp/docker/workspace") docker_vol = workspace.vol() # Returns volume mapping tuple container_path = workspace.container() # Returns container path host_path = workspace.host() # Returns host path

vol

vol()

Get Docker volume mapping tuple.

Returns:

Type Description
tuple[str, str]

tuple[str, str]: (host_path, container_path) for Docker -v flag

Source code in ccproxy/docker/docker_path.py
def vol(self) -> tuple[str, str]:
    """Get Docker volume mapping tuple.

    Returns:
        tuple[str, str]: (host_path, container_path) for Docker -v flag
    """
    if self.host_path is None:
        raise ValueError("host_path is not set, cannot create volume mapping")
    return (str(self.host_path), self.container_path)

host

host()

Get host path as Path object.

Returns:

Name Type Description
Path Path

Resolved host path

Source code in ccproxy/docker/docker_path.py
def host(self) -> Path:
    """Get host path as Path object.

    Returns:
        Path: Resolved host path
    """
    if self.host_path is None:
        raise ValueError("host_path is not set")
    return self.host_path

container

container()

Get container path as string.

Returns:

Name Type Description
str str

Container path

Source code in ccproxy/docker/docker_path.py
def container(self) -> str:
    """Get container path as string.

    Returns:
        str: Container path
    """
    return self.container_path

join

join(*subpaths)

Create new DockerPath with subpaths joined to both host and container paths.

Parameters:

Name Type Description Default
*subpaths str

Path components to join

()

Returns:

Name Type Description
DockerPath DockerPath

New instance with joined paths

Source code in ccproxy/docker/docker_path.py
def join(self, *subpaths: str) -> "DockerPath":
    """Create new DockerPath with subpaths joined to both host and container paths.

    Args:
        *subpaths: Path components to join

    Returns:
        DockerPath: New instance with joined paths
    """
    host_joined = self.host_path
    if host_joined:
        for subpath in subpaths:
            host_joined = host_joined / subpath

    container_joined = self.container_path
    for subpath in subpaths:
        container_joined = f"{container_joined}/{subpath}".replace("//", "/")

    return DockerPath(host_path=host_joined, container_path=container_joined)

DockerPathSet

DockerPathSet(base_host_path=None)

Collection of named Docker paths for organized path management.

Example

paths = DockerPathSet("/tmp/build") paths.add("workspace", "/workspace") paths.add("config", "/workspace/config")

workspace_vol = paths.get("workspace").vol() config_path = paths.get("config").container()

Parameters:

Name Type Description Default
base_host_path str | Path | None

Base path on host for all paths in this set

None
Source code in ccproxy/docker/docker_path.py
def __init__(self, base_host_path: str | Path | None = None) -> None:
    """Initialize Docker path set.

    Args:
        base_host_path: Base path on host for all paths in this set
    """
    self.base_host_path = Path(base_host_path).resolve() if base_host_path else None
    self.paths: dict[str, DockerPath] = {}
    self.logger = get_logger(f"{__name__}.{self.__class__.__name__}")

add

add(name, container_path, host_subpath=None)

Add a named Docker path to the set.

Parameters:

Name Type Description Default
name str

Logical name for the path

required
container_path str

Path inside the Docker container

required
host_subpath str | None

Optional subpath from base_host_path, defaults to name

None

Returns:

Name Type Description
Self Self

For method chaining

Source code in ccproxy/docker/docker_path.py
def add(
    self, name: str, container_path: str, host_subpath: str | None = None
) -> Self:
    """Add a named Docker path to the set.

    Args:
        name: Logical name for the path
        container_path: Path inside the Docker container
        host_subpath: Optional subpath from base_host_path, defaults to name

    Returns:
        Self: For method chaining
    """
    if self.base_host_path is None:
        raise ValueError("base_host_path must be set to use add() method")

    if host_subpath is None:
        host_subpath = name

    # Handle empty string to mean no subpath (use base path directly)
    if host_subpath == "":
        host_path = self.base_host_path
    else:
        host_path = self.base_host_path / host_subpath

    self.paths[name] = DockerPath(
        host_path=host_path, container_path=container_path
    )
    return self

add_path

add_path(name, docker_path)

Add a pre-created DockerPath to the set.

Parameters:

Name Type Description Default
name str

Logical name for the path

required
docker_path DockerPath

DockerPath instance to add

required

Returns:

Name Type Description
Self Self

For method chaining

Source code in ccproxy/docker/docker_path.py
def add_path(self, name: str, docker_path: DockerPath) -> Self:
    """Add a pre-created DockerPath to the set.

    Args:
        name: Logical name for the path
        docker_path: DockerPath instance to add

    Returns:
        Self: For method chaining
    """
    self.paths[name] = docker_path
    return self

get

get(name)

Get Docker path by name.

Parameters:

Name Type Description Default
name str

Logical name of the path

required

Returns:

Name Type Description
DockerPath DockerPath

The Docker path instance

Raises:

Type Description
KeyError

If path name is not found

Source code in ccproxy/docker/docker_path.py
def get(self, name: str) -> DockerPath:
    """Get Docker path by name.

    Args:
        name: Logical name of the path

    Returns:
        DockerPath: The Docker path instance

    Raises:
        KeyError: If path name is not found
    """
    if name not in self.paths:
        raise KeyError(
            f"Docker path '{name}' not found. Available: {list(self.paths.keys())}"
        )
    return self.paths[name]

has

has(name)

Check if a path name exists in the set.

Parameters:

Name Type Description Default
name str

Logical name to check

required

Returns:

Name Type Description
bool bool

True if path exists

Source code in ccproxy/docker/docker_path.py
def has(self, name: str) -> bool:
    """Check if a path name exists in the set.

    Args:
        name: Logical name to check

    Returns:
        bool: True if path exists
    """
    return name in self.paths

volumes

volumes()

Get all volume mappings for Docker.

Returns:

Type Description
list[tuple[str, str]]

list[tuple[str, str]]: List of (host_path, container_path) tuples

Source code in ccproxy/docker/docker_path.py
def volumes(self) -> list[tuple[str, str]]:
    """Get all volume mappings for Docker.

    Returns:
        list[tuple[str, str]]: List of (host_path, container_path) tuples
    """
    return [path.vol() for path in self.paths.values()]

names

names()

Get all path names in the set.

Returns:

Type Description
list[str]

list[str]: List of logical path names

Source code in ccproxy/docker/docker_path.py
def names(self) -> list[str]:
    """Get all path names in the set.

    Returns:
        list[str]: List of logical path names
    """
    return list(self.paths.keys())

LoggerOutputMiddleware

LoggerOutputMiddleware(
    logger, stdout_prefix="", stderr_prefix=""
)

Bases: OutputMiddleware[str]

Simple middleware that prints output with optional prefixes.

This middleware prints each line to the console with configurable prefixes for stdout and stderr streams.

Parameters:

Name Type Description Default
stdout_prefix str

Prefix for stdout lines (default: "")

''
stderr_prefix str

Prefix for stderr lines (default: "")

''
Source code in ccproxy/docker/middleware.py
def __init__(self, logger: Any, stdout_prefix: str = "", stderr_prefix: str = ""):
    """Initialize middleware with custom prefixes.

    Args:
        stdout_prefix: Prefix for stdout lines (default: "")
        stderr_prefix: Prefix for stderr lines (default: "")
    """
    self.logger = logger
    self.stderr_prefix = stderr_prefix
    self.stdout_prefix = stdout_prefix

process async

process(line, stream_type)

Process and print a line with the appropriate prefix.

Parameters:

Name Type Description Default
line str

Output line to process

required
stream_type str

Either "stdout" or "stderr"

required

Returns:

Type Description
str

The original line (unmodified)

Source code in ccproxy/docker/middleware.py
async def process(self, line: str, stream_type: str) -> str:
    """Process and print a line with the appropriate prefix.

    Args:
        line: Output line to process
        stream_type: Either "stdout" or "stderr"

    Returns:
        The original line (unmodified)
    """
    if stream_type == "stdout":
        self.logger.info(
            "docker_stdout", prefix=self.stdout_prefix, line=line, stream="stdout"
        )
    else:
        self.logger.info(
            "docker_stderr", prefix=self.stderr_prefix, line=line, stream="stderr"
        )
    return line

DockerUserContext

Bases: BaseModel

Docker user context for volume permission handling.

Represents user information needed for Docker --user flag to solve volume permission issues when mounting host directories.

validate_positive_ids classmethod

validate_positive_ids(v)

Validate that UID/GID are positive integers.

Source code in ccproxy/docker/models.py
@field_validator("uid", "gid")
@classmethod
def validate_positive_ids(cls, v: int) -> int:
    """Validate that UID/GID are positive integers."""
    if v < 0:
        raise ValueError("UID and GID must be non-negative")
    return v

validate_username classmethod

validate_username(v)

Validate username is not empty.

Source code in ccproxy/docker/models.py
@field_validator("username")
@classmethod
def validate_username(cls, v: str) -> str:
    """Validate username is not empty."""
    if not v or not v.strip():
        raise ValueError("Username cannot be empty")
    return v.strip()

detect_current_user classmethod

detect_current_user(home_path=None, workspace_path=None)

Detect current user context from system.

Parameters:

Name Type Description Default
home_path DockerPath | None

Optional home directory DockerPath override

None
workspace_path DockerPath | None

Optional workspace directory DockerPath override

None

Returns:

Name Type Description
DockerUserContext DockerUserContext

Current user's context

Raises:

Type Description
RuntimeError

If user detection fails or platform unsupported

Source code in ccproxy/docker/models.py
@classmethod
def detect_current_user(
    cls,
    home_path: DockerPath | None = None,
    workspace_path: DockerPath | None = None,
) -> "DockerUserContext":
    """Detect current user context from system.

    Args:
        home_path: Optional home directory DockerPath override
        workspace_path: Optional workspace directory DockerPath override

    Returns:
        DockerUserContext: Current user's context

    Raises:
        RuntimeError: If user detection fails or platform unsupported
    """
    current_platform = platform.system()

    if current_platform not in cls._supported_platforms:
        raise RuntimeError(
            f"User detection not supported on {current_platform}. "
            f"Supported platforms: {', '.join(cls._supported_platforms)}"
        )

    try:
        uid = os.getuid()
        gid = os.getgid()
        username = os.getenv("USER") or os.getenv("USERNAME") or "unknown"

        # Create default home path if not provided
        if home_path is None:
            host_home_env = os.getenv("HOME")
            if host_home_env:
                home_path = DockerPath(
                    host_path=Path(host_home_env), container_path="/data/home"
                )

        return cls(
            uid=uid,
            gid=gid,
            username=username,
            enable_user_mapping=True,
            home_path=home_path,
            workspace_path=workspace_path,
        )

    except AttributeError as e:
        raise RuntimeError(
            f"Failed to detect user on {current_platform}: {e}"
        ) from e
    except Exception as e:
        raise RuntimeError(f"Unexpected error detecting user: {e}") from e

create_manual classmethod

create_manual(
    uid,
    gid,
    username,
    home_path=None,
    workspace_path=None,
    enable_user_mapping=True,
)

Create manual user context with custom values.

Parameters:

Name Type Description Default
uid int

User ID for Docker --user flag

required
gid int

Group ID for Docker --user flag

required
username str

Username for reference

required
home_path DockerPath | None

Optional home directory DockerPath

None
workspace_path DockerPath | None

Optional workspace directory DockerPath

None
enable_user_mapping bool

Whether to enable --user flag in Docker commands

True

Returns:

Name Type Description
DockerUserContext DockerUserContext

Manual user context

Raises:

Type Description
ValueError

If validation fails for any parameter

Source code in ccproxy/docker/models.py
@classmethod
def create_manual(
    cls,
    uid: int,
    gid: int,
    username: str,
    home_path: DockerPath | None = None,
    workspace_path: DockerPath | None = None,
    enable_user_mapping: bool = True,
) -> "DockerUserContext":
    """Create manual user context with custom values.

    Args:
        uid: User ID for Docker --user flag
        gid: Group ID for Docker --user flag
        username: Username for reference
        home_path: Optional home directory DockerPath
        workspace_path: Optional workspace directory DockerPath
        enable_user_mapping: Whether to enable --user flag in Docker commands

    Returns:
        DockerUserContext: Manual user context

    Raises:
        ValueError: If validation fails for any parameter
    """
    return cls(
        uid=uid,
        gid=gid,
        username=username,
        enable_user_mapping=enable_user_mapping,
        home_path=home_path,
        workspace_path=workspace_path,
    )

get_docker_user_flag

get_docker_user_flag()

Get Docker --user flag value.

Returns:

Name Type Description
str str

Docker user flag in format "uid:gid"

Source code in ccproxy/docker/models.py
def get_docker_user_flag(self) -> str:
    """Get Docker --user flag value.

    Returns:
        str: Docker user flag in format "uid:gid"
    """
    return f"{self.uid}:{self.gid}"

is_supported_platform

is_supported_platform()

Check if current platform supports user mapping.

Returns:

Name Type Description
bool bool

True if platform supports user mapping

Source code in ccproxy/docker/models.py
def is_supported_platform(self) -> bool:
    """Check if current platform supports user mapping.

    Returns:
        bool: True if platform supports user mapping
    """
    return platform.system() in self._supported_platforms

should_use_user_mapping

should_use_user_mapping()

Check if user mapping should be used.

Returns:

Name Type Description
bool bool

True if user mapping is enabled and platform is supported

Source code in ccproxy/docker/models.py
def should_use_user_mapping(self) -> bool:
    """Check if user mapping should be used.

    Returns:
        bool: True if user mapping is enabled and platform is supported
    """
    return self.enable_user_mapping and self.is_supported_platform()

get_environment_variables

get_environment_variables()

Get environment variables for home and workspace directory configuration.

Returns:

Type Description
dict[str, str]

dict[str, str]: Environment variables to set in container

Source code in ccproxy/docker/models.py
def get_environment_variables(self) -> dict[str, str]:
    """Get environment variables for home and workspace directory configuration.

    Returns:
        dict[str, str]: Environment variables to set in container
    """
    env = {}
    if self.home_path:
        env["HOME"] = self.home_path.container()
        env["CLAUDE_HOME"] = self.home_path.container()
    if self.workspace_path:
        env["CLAUDE_WORKSPACE"] = self.workspace_path.container()
    return env

get_volumes

get_volumes()

Get Docker volume mappings for home and workspace directories.

Returns:

Type Description
list[tuple[str, str]]

list[tuple[str, str]]: List of (host_path, container_path) tuples

Source code in ccproxy/docker/models.py
def get_volumes(self) -> list[tuple[str, str]]:
    """Get Docker volume mappings for home and workspace directories.

    Returns:
        list[tuple[str, str]]: List of (host_path, container_path) tuples
    """
    volumes = []
    if self.home_path and self.home_path.host_path:
        volumes.append(self.home_path.vol())
    if self.workspace_path and self.workspace_path.host_path:
        volumes.append(self.workspace_path.vol())
    return volumes

get_home_volumes

get_home_volumes()

Get Docker volume mappings for home directory only (for backwards compatibility).

Returns:

Type Description
list[tuple[str, str]]

list[tuple[str, str]]: List of (host_path, container_path) tuples

Source code in ccproxy/docker/models.py
def get_home_volumes(self) -> list[tuple[str, str]]:
    """Get Docker volume mappings for home directory only (for backwards compatibility).

    Returns:
        list[tuple[str, str]]: List of (host_path, container_path) tuples
    """
    volumes = []
    if self.home_path and self.home_path.host_path:
        volumes.append(self.home_path.vol())
    return volumes

describe_context

describe_context()

Get human-readable description of user context.

Returns:

Name Type Description
str str

Description of user context for debugging

Source code in ccproxy/docker/models.py
def describe_context(self) -> str:
    """Get human-readable description of user context.

    Returns:
        str: Description of user context for debugging
    """
    parts = [
        f"uid={self.uid}",
        f"gid={self.gid}",
        f"username={self.username}",
    ]

    if self.home_path:
        parts.append(f"home_path={self.home_path}")

    if self.workspace_path:
        parts.append(f"workspace_path={self.workspace_path}")

    return f"DockerUserContext({', '.join(parts)})"

DockerAdapterProtocol

Bases: Protocol

Protocol for Docker operations.

is_available

is_available()

Check if Docker is available on the system.

Returns:

Type Description
Awaitable[bool]

True if Docker is available, False otherwise

Source code in ccproxy/docker/protocol.py
def is_available(self) -> Awaitable[bool]:
    """Check if Docker is available on the system.

    Returns:
        True if Docker is available, False otherwise
    """
    ...

run

run(
    image,
    volumes,
    environment,
    command=None,
    middleware=None,
    user_context=None,
    entrypoint=None,
    ports=None,
)

Run a Docker container with specified configuration.

Alias for run_container method.

Parameters:

Name Type Description Default
image str

Docker image name/tag to run

required
volumes list[DockerVolume]

List of volume mounts (host_path, container_path)

required
environment DockerEnv

Dictionary of environment variables

required
command list[str] | None

Optional command to run in the container

None
middleware OutputMiddleware[T] | None

Optional middleware for processing output

None
user_context DockerUserContext | None

Optional user context for Docker --user flag

None
entrypoint str | None

Optional custom entrypoint to override the image's default

None
ports list[DockerPortSpec] | None

Optional port specifications (e.g., ["8080:80", "localhost:9000:9000"])

None

Returns:

Type Description
Awaitable[ProcessResult[T]]

Tuple containing (return_code, stdout_lines, stderr_lines)

Raises:

Type Description
DockerError

If the container fails to run

Source code in ccproxy/docker/protocol.py
def run(
    self,
    image: str,
    volumes: list[DockerVolume],
    environment: DockerEnv,
    command: list[str] | None = None,
    middleware: OutputMiddleware[T] | None = None,
    user_context: DockerUserContext | None = None,
    entrypoint: str | None = None,
    ports: list[DockerPortSpec] | None = None,
) -> Awaitable[ProcessResult[T]]:
    """Run a Docker container with specified configuration.

    Alias for run_container method.

    Args:
        image: Docker image name/tag to run
        volumes: List of volume mounts (host_path, container_path)
        environment: Dictionary of environment variables
        command: Optional command to run in the container
        middleware: Optional middleware for processing output
        user_context: Optional user context for Docker --user flag
        entrypoint: Optional custom entrypoint to override the image's default
        ports: Optional port specifications (e.g., ["8080:80", "localhost:9000:9000"])

    Returns:
        Tuple containing (return_code, stdout_lines, stderr_lines)

    Raises:
        DockerError: If the container fails to run
    """
    ...

run_container

run_container(
    image,
    volumes,
    environment,
    command=None,
    middleware=None,
    user_context=None,
    entrypoint=None,
    ports=None,
)

Run a Docker container with specified configuration.

Parameters:

Name Type Description Default
image str

Docker image name/tag to run

required
volumes list[DockerVolume]

List of volume mounts (host_path, container_path)

required
environment DockerEnv

Dictionary of environment variables

required
command list[str] | None

Optional command to run in the container

None
middleware OutputMiddleware[T] | None

Optional middleware for processing output

None
user_context DockerUserContext | None

Optional user context for Docker --user flag

None
entrypoint str | None

Optional custom entrypoint to override the image's default

None
ports list[DockerPortSpec] | None

Optional port specifications (e.g., ["8080:80", "localhost:9000:9000"])

None

Returns:

Type Description
Awaitable[ProcessResult[T]]

Tuple containing (return_code, stdout_lines, stderr_lines)

Raises:

Type Description
DockerError

If the container fails to run

Source code in ccproxy/docker/protocol.py
def run_container(
    self,
    image: str,
    volumes: list[DockerVolume],
    environment: DockerEnv,
    command: list[str] | None = None,
    middleware: OutputMiddleware[T] | None = None,
    user_context: DockerUserContext | None = None,
    entrypoint: str | None = None,
    ports: list[DockerPortSpec] | None = None,
) -> Awaitable[ProcessResult[T]]:
    """Run a Docker container with specified configuration.

    Args:
        image: Docker image name/tag to run
        volumes: List of volume mounts (host_path, container_path)
        environment: Dictionary of environment variables
        command: Optional command to run in the container
        middleware: Optional middleware for processing output
        user_context: Optional user context for Docker --user flag
        entrypoint: Optional custom entrypoint to override the image's default
        ports: Optional port specifications (e.g., ["8080:80", "localhost:9000:9000"])

    Returns:
        Tuple containing (return_code, stdout_lines, stderr_lines)

    Raises:
        DockerError: If the container fails to run
    """
    ...

exec_container

exec_container(
    image,
    volumes,
    environment,
    command=None,
    user_context=None,
    entrypoint=None,
    ports=None,
)

Execute a Docker container by replacing the current process.

This method builds the Docker command and replaces the current process with the Docker command using os.execvp, effectively handing over control to Docker.

Parameters:

Name Type Description Default
image str

Docker image name/tag to run

required
volumes list[DockerVolume]

List of volume mounts (host_path, container_path)

required
environment DockerEnv

Dictionary of environment variables

required
command list[str] | None

Optional command to run in the container

None
user_context DockerUserContext | None

Optional user context for Docker --user flag

None
entrypoint str | None

Optional custom entrypoint to override the image's default

None
ports list[DockerPortSpec] | None

Optional port specifications (e.g., ["8080:80", "localhost:9000:9000"])

None

Raises:

Type Description
DockerError

If the container fails to execute

OSError

If the command cannot be executed

Source code in ccproxy/docker/protocol.py
def exec_container(
    self,
    image: str,
    volumes: list[DockerVolume],
    environment: DockerEnv,
    command: list[str] | None = None,
    user_context: DockerUserContext | None = None,
    entrypoint: str | None = None,
    ports: list[DockerPortSpec] | None = None,
) -> None:
    """Execute a Docker container by replacing the current process.

    This method builds the Docker command and replaces the current process
    with the Docker command using os.execvp, effectively handing over control to Docker.

    Args:
        image: Docker image name/tag to run
        volumes: List of volume mounts (host_path, container_path)
        environment: Dictionary of environment variables
        command: Optional command to run in the container
        user_context: Optional user context for Docker --user flag
        entrypoint: Optional custom entrypoint to override the image's default
        ports: Optional port specifications (e.g., ["8080:80", "localhost:9000:9000"])

    Raises:
        DockerError: If the container fails to execute
        OSError: If the command cannot be executed
    """
    ...

build_image

build_image(
    dockerfile_dir,
    image_name,
    image_tag="latest",
    no_cache=False,
    middleware=None,
)

Build a Docker image from a Dockerfile.

Parameters:

Name Type Description Default
dockerfile_dir Path

Directory containing the Dockerfile

required
image_name str

Name to tag the built image with

required
image_tag str

Tag to use for the image

'latest'
no_cache bool

Whether to use Docker's cache during build

False
middleware OutputMiddleware[T] | None

Optional middleware for processing output

None

Returns:

Type Description
Awaitable[ProcessResult[T]]

ProcessResult containing (return_code, stdout_lines, stderr_lines)

Raises:

Type Description
DockerError

If the image fails to build

Source code in ccproxy/docker/protocol.py
def build_image(
    self,
    dockerfile_dir: Path,
    image_name: str,
    image_tag: str = "latest",
    no_cache: bool = False,
    middleware: OutputMiddleware[T] | None = None,
) -> Awaitable[ProcessResult[T]]:
    """Build a Docker image from a Dockerfile.

    Args:
        dockerfile_dir: Directory containing the Dockerfile
        image_name: Name to tag the built image with
        image_tag: Tag to use for the image
        no_cache: Whether to use Docker's cache during build
        middleware: Optional middleware for processing output

    Returns:
        ProcessResult containing (return_code, stdout_lines, stderr_lines)

    Raises:
        DockerError: If the image fails to build
    """
    ...

image_exists

image_exists(image_name, image_tag='latest')

Check if a Docker image exists locally.

Parameters:

Name Type Description Default
image_name str

Name of the image to check

required
image_tag str

Tag of the image to check

'latest'

Returns:

Type Description
Awaitable[bool]

True if the image exists locally, False otherwise

Source code in ccproxy/docker/protocol.py
def image_exists(
    self, image_name: str, image_tag: str = "latest"
) -> Awaitable[bool]:
    """Check if a Docker image exists locally.

    Args:
        image_name: Name of the image to check
        image_tag: Tag of the image to check

    Returns:
        True if the image exists locally, False otherwise
    """
    ...

pull_image

pull_image(image_name, image_tag='latest', middleware=None)

Pull a Docker image from registry.

Parameters:

Name Type Description Default
image_name str

Name of the image to pull

required
image_tag str

Tag of the image to pull

'latest'
middleware OutputMiddleware[T] | None

Optional middleware for processing output

None

Returns:

Type Description
Awaitable[ProcessResult[T]]

ProcessResult containing (return_code, stdout_lines, stderr_lines)

Raises:

Type Description
DockerError

If the image fails to pull

Source code in ccproxy/docker/protocol.py
def pull_image(
    self,
    image_name: str,
    image_tag: str = "latest",
    middleware: OutputMiddleware[T] | None = None,
) -> Awaitable[ProcessResult[T]]:
    """Pull a Docker image from registry.

    Args:
        image_name: Name of the image to pull
        image_tag: Tag of the image to pull
        middleware: Optional middleware for processing output

    Returns:
        ProcessResult containing (return_code, stdout_lines, stderr_lines)

    Raises:
        DockerError: If the image fails to pull
    """
    ...

ChainedOutputMiddleware

ChainedOutputMiddleware(middleware_chain)

Bases: OutputMiddleware[T]

Middleware that chains multiple middleware components together.

Processes output through a sequence of middleware components, where each middleware processes the output from the previous one. The final output type T is determined by the last middleware in the chain.

Example
# Chain progress tracking with logging
progress_middleware = CompilationProgressMiddleware(callback)
logger_middleware = LoggerOutputMiddleware(logger)

chained = ChainedOutputMiddleware([progress_middleware, logger_middleware])

# Process: line -> progress_middleware -> logger_middleware -> final result
result = docker_adapter.run_container("image", [], {}, middleware=chained)

Parameters:

Name Type Description Default
middleware_chain list[OutputMiddleware[Any]]

List of middleware components to chain together. Output flows from first to last middleware.

required

Raises:

Type Description
ValueError

If middleware_chain is empty

Source code in ccproxy/docker/stream_process.py
def __init__(self, middleware_chain: list[OutputMiddleware[Any]]) -> None:
    """Initialize chained middleware.

    Args:
        middleware_chain: List of middleware components to chain together.
                         Output flows from first to last middleware.

    Raises:
        ValueError: If middleware_chain is empty
    """
    if not middleware_chain:
        raise ValueError("Middleware chain cannot be empty")

    self.middleware_chain = middleware_chain

process async

process(line, stream_type)

Process line through the middleware chain.

Parameters:

Name Type Description Default
line str

Output line to process

required
stream_type str

Either "stdout" or "stderr"

required

Returns:

Type Description
T

Output from the final middleware in the chain

Source code in ccproxy/docker/stream_process.py
async def process(self, line: str, stream_type: str) -> T:
    """Process line through the middleware chain.

    Args:
        line: Output line to process
        stream_type: Either "stdout" or "stderr"

    Returns:
        Output from the final middleware in the chain
    """
    current_output: Any = line

    # Process through each middleware in sequence
    for middleware in self.middleware_chain:
        current_output = await middleware.process(current_output, stream_type)

    return cast(T, current_output)

DefaultOutputMiddleware

DefaultOutputMiddleware(
    stdout_prefix="", stderr_prefix="ERROR: "
)

Bases: OutputMiddleware[str]

Simple middleware that prints output with optional prefixes.

This middleware prints each line to the console with configurable prefixes for stdout and stderr streams.

Parameters:

Name Type Description Default
stdout_prefix str

Prefix for stdout lines (default: "")

''
stderr_prefix str

Prefix for stderr lines (default: "ERROR: ")

'ERROR: '
Source code in ccproxy/docker/stream_process.py
def __init__(self, stdout_prefix: str = "", stderr_prefix: str = "ERROR: ") -> None:
    """Initialize middleware with custom prefixes.

    Args:
        stdout_prefix: Prefix for stdout lines (default: "")
        stderr_prefix: Prefix for stderr lines (default: "ERROR: ")
    """
    self.stdout_prefix = stdout_prefix
    self.stderr_prefix = stderr_prefix

process async

process(line, stream_type)

Process and print a line with the appropriate prefix.

Parameters:

Name Type Description Default
line str

Output line to process

required
stream_type str

Either "stdout" or "stderr"

required

Returns:

Type Description
str

The original line (unmodified)

Source code in ccproxy/docker/stream_process.py
async def process(self, line: str, stream_type: str) -> str:
    """Process and print a line with the appropriate prefix.

    Args:
        line: Output line to process
        stream_type: Either "stdout" or "stderr"

    Returns:
        The original line (unmodified)
    """
    prefix = self.stdout_prefix if stream_type == "stdout" else self.stderr_prefix
    print(f"{prefix}{line}")
    return line

OutputMiddleware

Bases: Generic[T]

Base class for processing command output streams.

OutputMiddleware provides a way to intercept and process output lines from subprocesses. Implementations can format, filter, or transform the output as needed.

Type parameter T represents the return type of the process method, allowing middleware to transform strings into other types if needed.

process async

process(line, stream_type)

Process a line of output from a subprocess stream.

Parameters:

Name Type Description Default
line str

A line of text from the process output

required
stream_type str

Either "stdout" or "stderr"

required

Returns:

Type Description
T

Processed output of type T

Raises:

Type Description
NotImplementedError

Subclasses must implement this method

Source code in ccproxy/docker/stream_process.py
async def process(self, line: str, stream_type: str) -> T:
    """Process a line of output from a subprocess stream.

    Args:
        line: A line of text from the process output
        stream_type: Either "stdout" or "stderr"

    Returns:
        Processed output of type T

    Raises:
        NotImplementedError: Subclasses must implement this method
    """
    raise NotImplementedError()

create_docker_adapter

create_docker_adapter(
    image=None,
    volumes=None,
    environment=None,
    additional_args=None,
    user_context=None,
)

Factory function to create a DockerAdapter instance.

Parameters:

Name Type Description Default
image str | None

Docker image to use (optional)

None
volumes list[DockerVolume] | None

Optional list of volume mappings

None
environment DockerEnv | None

Optional environment variables

None
additional_args list[str] | None

Optional additional Docker arguments

None
user_context DockerUserContext | None

Optional user context for container

None

Returns:

Type Description
DockerAdapterProtocol

Configured DockerAdapter instance

Example

adapter = create_docker_adapter() if adapter.is_available(): ... adapter.run_container("ubuntu:latest", [], {})

Source code in ccproxy/docker/adapter.py
def create_docker_adapter(
    image: str | None = None,
    volumes: list[DockerVolume] | None = None,
    environment: DockerEnv | None = None,
    additional_args: list[str] | None = None,
    user_context: DockerUserContext | None = None,
) -> DockerAdapterProtocol:
    """
    Factory function to create a DockerAdapter instance.

    Args:
        image: Docker image to use (optional)
        volumes: Optional list of volume mappings
        environment: Optional environment variables
        additional_args: Optional additional Docker arguments
        user_context: Optional user context for container

    Returns:
        Configured DockerAdapter instance

    Example:
        >>> adapter = create_docker_adapter()
        >>> if adapter.is_available():
        ...     adapter.run_container("ubuntu:latest", [], {})
    """
    return DockerAdapter()

create_chained_docker_middleware

create_chained_docker_middleware(
    middleware_chain,
    include_logger=True,
    logger_instance=None,
    stdout_prefix="",
    stderr_prefix="",
)

Factory function to create chained middleware for Docker operations.

Parameters:

Name Type Description Default
middleware_chain list[OutputMiddleware[Any]]

List of middleware components to chain together

required
include_logger bool

Whether to automatically add logger middleware at the end

True
logger_instance Any | None

Logger instance to use (defaults to module logger)

None
stdout_prefix str

Prefix for stdout lines in logger middleware

''
stderr_prefix str

Prefix for stderr lines in logger middleware

''

Returns:

Type Description
OutputMiddleware[Any]

Chained middleware instance

Source code in ccproxy/docker/middleware.py
def create_chained_docker_middleware(
    middleware_chain: list[OutputMiddleware[Any]],
    include_logger: bool = True,
    logger_instance: Any | None = None,
    stdout_prefix: str = "",
    stderr_prefix: str = "",
) -> OutputMiddleware[Any]:
    """Factory function to create chained middleware for Docker operations.

    Args:
        middleware_chain: List of middleware components to chain together
        include_logger: Whether to automatically add logger middleware at the end
        logger_instance: Logger instance to use (defaults to module logger)
        stdout_prefix: Prefix for stdout lines in logger middleware
        stderr_prefix: Prefix for stderr lines in logger middleware

    Returns:
        Chained middleware instance

    """
    final_chain = list(middleware_chain)

    if include_logger:
        logger_middleware = create_logger_middleware(
            logger_instance, stdout_prefix, stderr_prefix
        )
        final_chain.append(logger_middleware)

    if len(final_chain) == 1:
        return final_chain[0]

    return create_chained_middleware(final_chain)

create_logger_middleware

create_logger_middleware(
    logger_instance=None, stdout_prefix="", stderr_prefix=""
)

Factory function to create a LoggerOutputMiddleware instance.

Parameters:

Name Type Description Default
logger_instance Any | None

Logger instance to use (defaults to module logger)

None
stdout_prefix str

Prefix for stdout lines

''
stderr_prefix str

Prefix for stderr lines

''

Returns:

Type Description
LoggerOutputMiddleware

Configured LoggerOutputMiddleware instance

Source code in ccproxy/docker/middleware.py
def create_logger_middleware(
    logger_instance: Any | None = None,
    stdout_prefix: str = "",
    stderr_prefix: str = "",
) -> LoggerOutputMiddleware:
    """Factory function to create a LoggerOutputMiddleware instance.

    Args:
        logger_instance: Logger instance to use (defaults to module logger)
        stdout_prefix: Prefix for stdout lines
        stderr_prefix: Prefix for stderr lines

    Returns:
        Configured LoggerOutputMiddleware instance
    """
    if logger_instance is None:
        logger_instance = logger
    return LoggerOutputMiddleware(logger_instance, stdout_prefix, stderr_prefix)

create_chained_middleware

create_chained_middleware(middleware_chain)

Factory function to create a chained middleware.

Parameters:

Name Type Description Default
middleware_chain list[OutputMiddleware[Any]]

List of middleware components to chain together

required

Returns:

Type Description
ChainedOutputMiddleware[Any]

ChainedOutputMiddleware instance

Raises:

Type Description
ValueError

If middleware_chain is empty

Example
from ccproxy.docker.stream_process import create_chained_middleware
from ccproxy.docker.adapter import LoggerOutputMiddleware

# Create individual middleware components
logger_middleware = LoggerOutputMiddleware(logger)

# Chain them together
chained = create_chained_middleware([logger_middleware])

# Use with docker adapter
result = docker_adapter.run_container("image", [], {}, middleware=chained)
Source code in ccproxy/docker/stream_process.py
def create_chained_middleware(
    middleware_chain: list[OutputMiddleware[Any]],
) -> ChainedOutputMiddleware[Any]:
    """Factory function to create a chained middleware.

    Args:
        middleware_chain: List of middleware components to chain together

    Returns:
        ChainedOutputMiddleware instance

    Raises:
        ValueError: If middleware_chain is empty

    Example:
        ```python
        from ccproxy.docker.stream_process import create_chained_middleware
        from ccproxy.docker.adapter import LoggerOutputMiddleware

        # Create individual middleware components
        logger_middleware = LoggerOutputMiddleware(logger)

        # Chain them together
        chained = create_chained_middleware([logger_middleware])

        # Use with docker adapter
        result = docker_adapter.run_container("image", [], {}, middleware=chained)
        ```
    """
    return ChainedOutputMiddleware(middleware_chain)

run_command async

run_command(cmd, middleware=None)

Run a command and process its output through middleware.

This function executes a command as a subprocess and streams its output through the provided middleware for real-time processing. The processed outputs are collected and returned along with the exit code.

Parameters:

Name Type Description Default
cmd str | list[str]

Command to run, either as a string or list of arguments

required
middleware OutputMiddleware[T] | None

Optional middleware for processing output (uses DefaultOutputMiddleware if None)

None

Returns:

Type Description
ProcessResult[T]

Tuple containing: - Return code from the process (0 for success) - List of processed stdout lines - List of processed stderr lines

Example
# Simple command execution
rc, stdout, stderr = await run_command("ls -l")

# With custom middleware
class CustomMiddleware(OutputMiddleware[str]):
    async def process(self, line: str, stream_type: str) -> str:
        return f"[{stream_type}] {line}"

rc, stdout, stderr = await run_command("ls -l", CustomMiddleware())
Source code in ccproxy/docker/stream_process.py
async def run_command(
    cmd: str | list[str],
    middleware: OutputMiddleware[T] | None = None,
) -> ProcessResult[T]:
    """Run a command and process its output through middleware.

    This function executes a command as a subprocess and streams its output
    through the provided middleware for real-time processing. The processed
    outputs are collected and returned along with the exit code.

    Args:
        cmd: Command to run, either as a string or list of arguments
        middleware: Optional middleware for processing output (uses DefaultOutputMiddleware if None)

    Returns:
        Tuple containing:
            - Return code from the process (0 for success)
            - List of processed stdout lines
            - List of processed stderr lines

    Example:
        ```python
        # Simple command execution
        rc, stdout, stderr = await run_command("ls -l")

        # With custom middleware
        class CustomMiddleware(OutputMiddleware[str]):
            async def process(self, line: str, stream_type: str) -> str:
                return f"[{stream_type}] {line}"

        rc, stdout, stderr = await run_command("ls -l", CustomMiddleware())
        ```
    """
    if middleware is None:
        # Cast is needed because T is unbound at this point
        middleware = cast(OutputMiddleware[T], DefaultOutputMiddleware())

    # Parse string commands into argument lists
    if isinstance(cmd, str):
        cmd = shlex.split(cmd)

    # Start the async process with pipes for stdout and stderr
    process = await asyncio.create_subprocess_exec(
        *cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )

    async def stream_output(stream: asyncio.StreamReader, stream_type: str) -> list[T]:
        """Process output from a stream and capture results.

        Args:
            stream: Async stream to read from (stdout or stderr)
            stream_type: Type of the stream ("stdout" or "stderr")

        Returns:
            List of processed output lines
        """
        captured: list[T] = []
        while True:
            line_bytes = await stream.readline()
            if not line_bytes:
                break
            line = line_bytes.decode().rstrip()
            if line:
                processed = await middleware.process(line, stream_type)
                if processed is not None:
                    captured.append(processed)
        return captured

    # Create async tasks for concurrent output processing
    # Ensure stdout and stderr are available
    if process.stdout is None or process.stderr is None:
        raise RuntimeError("Process stdout or stderr is None")

    stdout_task = asyncio.create_task(stream_output(process.stdout, "stdout"))
    stderr_task = asyncio.create_task(stream_output(process.stderr, "stderr"))

    # Wait for process to complete and collect output
    return_code = await process.wait()
    stdout_lines = await stdout_task
    stderr_lines = await stderr_task

    return return_code, stdout_lines, stderr_lines

create_docker_error

create_docker_error(
    message, command=None, cause=None, details=None
)

Create a DockerError with standardized context.

Parameters:

Name Type Description Default
message str

Human-readable error message

required
command str | None

Docker command that failed (optional)

None
cause Exception | None

Original exception that caused this error (optional)

None
details dict[str, Any] | None

Additional context details (optional)

None

Returns:

Type Description
DockerError

DockerError instance with all context information

Source code in ccproxy/docker/validators.py
def create_docker_error(
    message: str,
    command: str | None = None,
    cause: Exception | None = None,
    details: dict[str, Any] | None = None,
) -> DockerError:
    """Create a DockerError with standardized context.

    Args:
        message: Human-readable error message
        command: Docker command that failed (optional)
        cause: Original exception that caused this error (optional)
        details: Additional context details (optional)

    Returns:
        DockerError instance with all context information
    """
    return DockerError(
        message=message,
        command=command,
        cause=cause,
        details=details,
    )

validate_port_spec

validate_port_spec(port_spec)

Validate a Docker port specification string.

Supports formats like: - "8080:80" - "localhost:8080:80" - "127.0.0.1:8080:80" - "8080:80/tcp" - "localhost:8080:80/udp" - "[::1]:8080:80"

Parameters:

Name Type Description Default
port_spec str

Port specification string

required

Returns:

Type Description
str

Validated port specification string

Raises:

Type Description
DockerError

If port specification is invalid

Source code in ccproxy/docker/validators.py
def validate_port_spec(port_spec: str) -> str:
    """Validate a Docker port specification string.

    Supports formats like:
    - "8080:80"
    - "localhost:8080:80"
    - "127.0.0.1:8080:80"
    - "8080:80/tcp"
    - "localhost:8080:80/udp"
    - "[::1]:8080:80"

    Args:
        port_spec: Port specification string

    Returns:
        Validated port specification string

    Raises:
        DockerError: If port specification is invalid
    """
    if not port_spec or not isinstance(port_spec, str):
        raise create_docker_error(
            f"Invalid port specification: {port_spec!r}",
            details={"port_spec": port_spec},
        )

    # Remove protocol suffix for validation if present
    port_part = port_spec
    protocol = None
    if "/" in port_spec:
        port_part, protocol = port_spec.rsplit("/", 1)
        if protocol not in ("tcp", "udp"):
            raise create_docker_error(
                f"Invalid protocol in port specification: {protocol}",
                details={"port_spec": port_spec, "protocol": protocol},
            )

    # Handle IPv6 address format specially
    if port_part.startswith("["):
        # IPv6 format like [::1]:8080:80
        ipv6_end = port_part.find("]:")
        if ipv6_end == -1:
            raise create_docker_error(
                f"Invalid IPv6 port specification format: {port_spec}",
                details={
                    "port_spec": port_spec,
                    "expected_format": "[ipv6]:host_port:container_port",
                },
            )

        host_ip = port_part[: ipv6_end + 1]  # Include the closing ]
        remaining = port_part[ipv6_end + 2 :]  # Skip ]:
        port_parts = remaining.split(":")

        if len(port_parts) != 2:
            raise create_docker_error(
                f"Invalid IPv6 port specification format: {port_spec}",
                details={
                    "port_spec": port_spec,
                    "expected_format": "[ipv6]:host_port:container_port",
                },
            )

        host_port, container_port = port_parts
        parts = [host_ip, host_port, container_port]
    else:
        # Regular format
        parts = port_part.split(":")

    if len(parts) == 2:
        # Format: "host_port:container_port"
        host_port, container_port = parts
        try:
            host_port_num = int(host_port)
            container_port_num = int(container_port)
            if not (1 <= host_port_num <= 65535) or not (
                1 <= container_port_num <= 65535
            ):
                raise ValueError("Port numbers must be between 1 and 65535")
        except ValueError as e:
            raise create_docker_error(
                f"Invalid port numbers in specification: {port_spec}",
                details={"port_spec": port_spec, "error": str(e)},
            ) from e

    elif len(parts) == 3:
        # Format: "host_ip:host_port:container_port"
        host_ip, host_port, container_port = parts

        # Basic IP validation (simplified)
        if not host_ip or host_ip in (
            "localhost",
            "127.0.0.1",
            "0.0.0.0",
            "::1",
            "[::1]",
        ):
            pass  # Common valid values
        elif host_ip.startswith("[") and host_ip.endswith("]"):
            pass  # IPv6 format like [::1]
        else:
            # Basic check for IPv4-like format
            ip_parts = host_ip.split(".")
            if len(ip_parts) == 4:
                try:
                    for part in ip_parts:
                        num = int(part)
                        if not (0 <= num <= 255):
                            raise ValueError("Invalid IPv4 address")
                except ValueError as e:
                    raise create_docker_error(
                        f"Invalid host IP in port specification: {host_ip}",
                        details={
                            "port_spec": port_spec,
                            "host_ip": host_ip,
                            "error": str(e),
                        },
                    ) from e

        try:
            host_port_num = int(host_port)
            container_port_num = int(container_port)
            if not (1 <= host_port_num <= 65535) or not (
                1 <= container_port_num <= 65535
            ):
                raise ValueError("Port numbers must be between 1 and 65535")
        except ValueError as e:
            raise create_docker_error(
                f"Invalid port numbers in specification: {port_spec}",
                details={"port_spec": port_spec, "error": str(e)},
            ) from e
    else:
        raise create_docker_error(
            f"Invalid port specification format: {port_spec}",
            details={
                "port_spec": port_spec,
                "expected_format": "host_port:container_port or host_ip:host_port:container_port",
            },
        )

    return port_spec