Skip to content

ccproxy.auth.storage.json_file

ccproxy.auth.storage.json_file

JSON file storage implementation for token storage.

JsonFileTokenStorage

JsonFileTokenStorage(file_path)

Bases: TokenStorage

JSON file storage implementation for Claude credentials with keyring fallback.

Parameters:

Name Type Description Default
file_path Path

Path to the JSON credentials file

required
Source code in ccproxy/auth/storage/json_file.py
def __init__(self, file_path: Path):
    """Initialize JSON file storage.

    Args:
        file_path: Path to the JSON credentials file
    """
    self.file_path = file_path

load async

load()

Load credentials from JSON file .

Returns:

Type Description
ClaudeCredentials | None

Parsed credentials if found and valid, None otherwise

Raises:

Type Description
CredentialsInvalidError

If the JSON file is invalid

CredentialsStorageError

If there's an error reading the file

Source code in ccproxy/auth/storage/json_file.py
async def load(self) -> ClaudeCredentials | None:
    """Load credentials from JSON file .

    Returns:
        Parsed credentials if found and valid, None otherwise

    Raises:
        CredentialsInvalidError: If the JSON file is invalid
        CredentialsStorageError: If there's an error reading the file
    """
    if not await self.exists():
        logger.debug("credentials_file_not_found", path=str(self.file_path))
        return None

    try:
        logger.debug(
            "credentials_load_start", source="file", path=str(self.file_path)
        )
        with self.file_path.open() as f:
            data = json.load(f)

        credentials = ClaudeCredentials.model_validate(data)
        logger.debug("credentials_load_completed", source="file")

        return credentials

    except json.JSONDecodeError as e:
        raise CredentialsInvalidError(
            f"Failed to parse credentials file {self.file_path}: {e}"
        ) from e
    except Exception as e:
        raise CredentialsStorageError(
            f"Error loading credentials from {self.file_path}: {e}"
        ) from e

save async

save(credentials)

Save credentials to both keyring and JSON file.

Parameters:

Name Type Description Default
credentials ClaudeCredentials

Credentials to save

required

Returns:

Type Description
bool

True if saved successfully, False otherwise

Raises:

Type Description
CredentialsStorageError

If there's an error writing the file

Source code in ccproxy/auth/storage/json_file.py
async def save(self, credentials: ClaudeCredentials) -> bool:
    """Save credentials to both keyring and JSON file.

    Args:
        credentials: Credentials to save

    Returns:
        True if saved successfully, False otherwise

    Raises:
        CredentialsStorageError: If there's an error writing the file
    """
    try:
        # Convert to dict with proper aliases
        data = credentials.model_dump(by_alias=True, mode="json")

        # Always save to file as well
        # Ensure parent directory exists
        self.file_path.parent.mkdir(parents=True, exist_ok=True)

        # Use atomic write: write to temp file then rename
        temp_path = self.file_path.with_suffix(".tmp")

        try:
            with temp_path.open("w") as f:
                json.dump(data, f, indent=2)

            # Set appropriate file permissions (read/write for owner only)
            temp_path.chmod(0o600)

            # Atomically replace the original file
            Path.replace(temp_path, self.file_path)

            logger.debug(
                "credentials_save_completed",
                source="file",
                path=str(self.file_path),
            )
            return True
        except Exception as e:
            raise
        finally:
            # Clean up temp file if it exists
            if temp_path.exists():
                with contextlib.suppress(Exception):
                    temp_path.unlink()

    except Exception as e:
        raise CredentialsStorageError(f"Error saving credentials: {e}") from e

exists async

exists()

Check if credentials file exists.

Returns:

Type Description
bool

True if file exists, False otherwise

Source code in ccproxy/auth/storage/json_file.py
async def exists(self) -> bool:
    """Check if credentials file exists.

    Returns:
        True if file exists, False otherwise
    """
    return self.file_path.exists() and self.file_path.is_file()

delete async

delete()

Delete credentials from both keyring and file.

Returns:

Type Description
bool

True if deleted successfully, False otherwise

Raises:

Type Description
CredentialsStorageError

If there's an error deleting the file

Source code in ccproxy/auth/storage/json_file.py
async def delete(self) -> bool:
    """Delete credentials from both keyring and file.

    Returns:
        True if deleted successfully, False otherwise

    Raises:
        CredentialsStorageError: If there's an error deleting the file
    """
    deleted = False

    # Delete from file
    try:
        if await self.exists():
            self.file_path.unlink()
            logger.debug(
                "credentials_delete_completed",
                source="file",
                path=str(self.file_path),
            )
            deleted = True
    except Exception as e:
        if not deleted:  # Only raise if we failed to delete from both
            raise CredentialsStorageError(f"Error deleting credentials: {e}") from e
        logger.debug("credentials_delete_partial", source="file", error=str(e))

    return deleted

get_location

get_location()

Get the storage location description.

Returns:

Type Description
str

Path to the JSON file with keyring info if available

Source code in ccproxy/auth/storage/json_file.py
def get_location(self) -> str:
    """Get the storage location description.

    Returns:
        Path to the JSON file with keyring info if available
    """
    return str(self.file_path)