Skip to content

ccproxy.auth.openai

ccproxy.auth.openai

OpenAI authentication components for Codex integration.

OpenAICredentials

Bases: BaseModel

OpenAI authentication credentials model.

parse_expires_at classmethod

parse_expires_at(v)

Parse expiration timestamp.

Source code in ccproxy/auth/openai/credentials.py
@field_validator("expires_at", mode="before")
@classmethod
def parse_expires_at(cls, v: Any) -> datetime:
    """Parse expiration timestamp."""
    if isinstance(v, datetime):
        # Ensure timezone-aware datetime
        if v.tzinfo is None:
            return v.replace(tzinfo=UTC)
        return v

    if isinstance(v, str):
        # Handle ISO format strings
        try:
            dt = datetime.fromisoformat(v.replace("Z", "+00:00"))
            if dt.tzinfo is None:
                dt = dt.replace(tzinfo=UTC)
            return dt
        except ValueError as e:
            raise ValueError(f"Invalid datetime format: {v}") from e

    if isinstance(v, int | float):
        # Handle Unix timestamps
        return datetime.fromtimestamp(v, tz=UTC)

    raise ValueError(f"Cannot parse datetime from {type(v)}: {v}")

extract_account_id classmethod

extract_account_id(v, info)

Extract account ID from access token if not provided.

Source code in ccproxy/auth/openai/credentials.py
@field_validator("account_id", mode="before")
@classmethod
def extract_account_id(cls, v: Any, info: Any) -> str:
    """Extract account ID from access token if not provided."""
    if isinstance(v, str) and v:
        return v

    # Try to extract from access_token
    access_token = None
    if hasattr(info, "data") and info.data and isinstance(info.data, dict):
        access_token = info.data.get("access_token")

    if access_token and isinstance(access_token, str):
        try:
            # Decode JWT without verification to extract claims
            decoded = jwt.decode(access_token, options={"verify_signature": False})
            if "org_id" in decoded and isinstance(decoded["org_id"], str):
                return decoded["org_id"]
            elif "sub" in decoded and isinstance(decoded["sub"], str):
                return decoded["sub"]
            elif "account_id" in decoded and isinstance(decoded["account_id"], str):
                return decoded["account_id"]
        except Exception as e:
            logger.warning("Failed to extract account_id from token", error=str(e))

    raise ValueError(
        "account_id is required and could not be extracted from access_token"
    )

is_expired

is_expired()

Check if the access token is expired.

Source code in ccproxy/auth/openai/credentials.py
def is_expired(self) -> bool:
    """Check if the access token is expired."""
    now = datetime.now(UTC)
    return now >= self.expires_at

expires_in_seconds

expires_in_seconds()

Get seconds until token expires.

Source code in ccproxy/auth/openai/credentials.py
def expires_in_seconds(self) -> int:
    """Get seconds until token expires."""
    now = datetime.now(UTC)
    delta = self.expires_at - now
    return max(0, int(delta.total_seconds()))

to_dict

to_dict()

Convert to dictionary for storage.

Source code in ccproxy/auth/openai/credentials.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary for storage."""
    return {
        "access_token": self.access_token,
        "refresh_token": self.refresh_token,
        "expires_at": self.expires_at.isoformat(),
        "account_id": self.account_id,
        "active": self.active,
    }

from_dict classmethod

from_dict(data)

Create from dictionary.

Source code in ccproxy/auth/openai/credentials.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "OpenAICredentials":
    """Create from dictionary."""
    return cls(**data)

OpenAITokenManager

OpenAITokenManager(storage=None)

Manages OpenAI token storage and refresh operations.

Parameters:

Name Type Description Default
storage OpenAITokenStorage | None

Token storage backend. If None, uses default TOML file storage.

None
Source code in ccproxy/auth/openai/credentials.py
def __init__(self, storage: OpenAITokenStorage | None = None):
    """Initialize token manager.

    Args:
        storage: Token storage backend. If None, uses default TOML file storage.
    """
    self.storage = storage or OpenAITokenStorage()

load_credentials async

load_credentials()

Load credentials from storage.

Source code in ccproxy/auth/openai/credentials.py
async def load_credentials(self) -> OpenAICredentials | None:
    """Load credentials from storage."""
    try:
        return await self.storage.load()
    except Exception as e:
        logger.error("Failed to load OpenAI credentials", error=str(e))
        return None

save_credentials async

save_credentials(credentials)

Save credentials to storage.

Source code in ccproxy/auth/openai/credentials.py
async def save_credentials(self, credentials: OpenAICredentials) -> bool:
    """Save credentials to storage."""
    try:
        return await self.storage.save(credentials)
    except Exception as e:
        logger.error("Failed to save OpenAI credentials", error=str(e))
        return False

delete_credentials async

delete_credentials()

Delete credentials from storage.

Source code in ccproxy/auth/openai/credentials.py
async def delete_credentials(self) -> bool:
    """Delete credentials from storage."""
    try:
        return await self.storage.delete()
    except Exception as e:
        logger.error("Failed to delete OpenAI credentials", error=str(e))
        return False

has_credentials async

has_credentials()

Check if credentials exist.

Source code in ccproxy/auth/openai/credentials.py
async def has_credentials(self) -> bool:
    """Check if credentials exist."""
    try:
        return await self.storage.exists()
    except Exception:
        return False

get_valid_token async

get_valid_token()

Get a valid access token, refreshing if necessary.

Source code in ccproxy/auth/openai/credentials.py
async def get_valid_token(self) -> str | None:
    """Get a valid access token, refreshing if necessary."""
    credentials = await self.load_credentials()
    if not credentials or not credentials.active:
        return None

    # If token is not expired, return it
    if not credentials.is_expired():
        return credentials.access_token

    # TODO: Implement token refresh logic
    # For now, return None if expired (user needs to re-authenticate)
    logger.warning("OpenAI token expired, refresh not yet implemented")
    return None

get_storage_location

get_storage_location()

Get storage location description.

Source code in ccproxy/auth/openai/credentials.py
def get_storage_location(self) -> str:
    """Get storage location description."""
    return self.storage.get_location()

OpenAIOAuthClient

OpenAIOAuthClient(settings, token_manager=None)

OpenAI OAuth PKCE flow client.

Parameters:

Name Type Description Default
settings CodexSettings

Codex configuration settings

required
token_manager OpenAITokenManager | None

Token manager for credential storage

None
Source code in ccproxy/auth/openai/oauth_client.py
def __init__(
    self, settings: CodexSettings, token_manager: OpenAITokenManager | None = None
):
    """Initialize OAuth client.

    Args:
        settings: Codex configuration settings
        token_manager: Token manager for credential storage
    """
    self.settings = settings
    self.token_manager = token_manager or OpenAITokenManager()
    self._server_task: asyncio.Task[None] | None = None
    self._auth_complete = asyncio.Event()
    self._auth_result: OpenAICredentials | None = None
    self._auth_error: str | None = None

authenticate async

authenticate(open_browser=True)

Perform OAuth PKCE flow.

Parameters:

Name Type Description Default
open_browser bool

Whether to automatically open browser

True

Returns:

Type Description
OpenAICredentials

OpenAI credentials

Raises:

Type Description
ValueError

If authentication fails

Source code in ccproxy/auth/openai/oauth_client.py
async def authenticate(self, open_browser: bool = True) -> OpenAICredentials:
    """Perform OAuth PKCE flow.

    Args:
        open_browser: Whether to automatically open browser

    Returns:
        OpenAI credentials

    Raises:
        ValueError: If authentication fails
    """
    # Reset state
    self._auth_complete.clear()
    self._auth_result = None
    self._auth_error = None

    # Generate PKCE parameters
    code_verifier, code_challenge = self._generate_pkce_pair()
    state = secrets.token_urlsafe(32)

    # Create callback app
    app = self._create_callback_app(code_verifier, state)

    # Start callback server
    self._server_task = asyncio.create_task(self._run_callback_server(app))

    # Give server time to start
    await asyncio.sleep(1)

    # Build authorization URL
    auth_url = self._build_auth_url(code_challenge, state)

    logger.info("Starting OpenAI OAuth flow")
    print("\nPlease visit this URL to authenticate with OpenAI:")
    print(f"{auth_url}\n")

    if open_browser:
        try:
            webbrowser.open(auth_url)
            print("Opening browser...")
        except Exception as e:
            logger.warning("Failed to open browser automatically", error=str(e))
            print("Please copy and paste the URL above into your browser.")

    print("Waiting for authentication to complete...")

    try:
        # Wait for authentication to complete (with timeout)
        await asyncio.wait_for(self._auth_complete.wait(), timeout=300)  # 5 minutes

        if self._auth_error:
            raise ValueError(self._auth_error)

        if not self._auth_result:
            raise ValueError("Authentication completed but no credentials received")

        logger.info("OpenAI authentication successful")  # type: ignore[unreachable]
        return self._auth_result

    except TimeoutError as e:
        raise ValueError("Authentication timed out (5 minutes)") from e
    finally:
        # Clean up server
        if self._server_task and not self._server_task.done():
            self._server_task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._server_task

OpenAITokenStorage

OpenAITokenStorage(file_path=None)

JSON file-based storage for OpenAI credentials using Codex format.

Parameters:

Name Type Description Default
file_path Path | None

Path to JSON file. If None, uses ~/.codex/auth.json

None
Source code in ccproxy/auth/openai/storage.py
def __init__(self, file_path: Path | None = None):
    """Initialize storage with file path.

    Args:
        file_path: Path to JSON file. If None, uses ~/.codex/auth.json
    """
    self.file_path = file_path or Path.home() / ".codex" / "auth.json"

load async

load()

Load credentials from Codex JSON file.

Source code in ccproxy/auth/openai/storage.py
async def load(self) -> "OpenAICredentials | None":
    """Load credentials from Codex JSON file."""
    if not self.file_path.exists():
        return None

    try:
        with self.file_path.open("r") as f:
            data = json.load(f)

        # Extract tokens section
        tokens = data.get("tokens", {})
        if not tokens:
            logger.warning("No tokens section found in Codex auth file")
            return None

        # Get required fields
        access_token = tokens.get("access_token")
        refresh_token = tokens.get("refresh_token")
        account_id = tokens.get("account_id")

        if not access_token:
            logger.warning("No access_token found in Codex auth file")
            return None

        # Extract expiration from JWT token
        expires_at = self._extract_expiration_from_token(access_token)
        if not expires_at:
            logger.warning("Could not extract expiration from access token")
            return None

        # Import here to avoid circular import
        from .credentials import OpenAICredentials

        # Create credentials object
        credentials_data = {
            "access_token": access_token,
            "refresh_token": refresh_token or "",
            "expires_at": expires_at,
            "account_id": account_id or "",
            "active": True,
        }

        return OpenAICredentials.from_dict(credentials_data)

    except Exception as e:
        logger.error(
            "Failed to load OpenAI credentials from Codex auth file",
            file_path=str(self.file_path),
            error=str(e),
        )
        return None

save async

save(credentials)

Save credentials to Codex JSON file.

Source code in ccproxy/auth/openai/storage.py
async def save(self, credentials: "OpenAICredentials") -> bool:
    """Save credentials to Codex JSON file."""
    try:
        # Create directory if it doesn't exist
        self.file_path.parent.mkdir(parents=True, exist_ok=True)

        # Load existing file or create new structure
        existing_data: dict[str, Any] = {}
        if self.file_path.exists():
            try:
                with self.file_path.open("r") as f:
                    existing_data = json.load(f)
            except Exception:
                logger.warning(
                    "Could not load existing auth file, creating new one"
                )

        # Prepare Codex JSON data structure
        codex_data = {
            "OPENAI_API_KEY": existing_data.get("OPENAI_API_KEY"),
            "tokens": {
                "id_token": existing_data.get("tokens", {}).get("id_token"),
                "access_token": credentials.access_token,
                "refresh_token": credentials.refresh_token,
                "account_id": credentials.account_id,
            },
            "last_refresh": datetime.now(UTC).isoformat().replace("+00:00", "Z"),
        }

        # Write atomically by writing to temp file then renaming
        temp_file = self.file_path.with_suffix(f"{self.file_path.suffix}.tmp")

        with temp_file.open("w") as f:
            json.dump(codex_data, f, indent=2)

        # Set restrictive permissions (readable only by owner)
        temp_file.chmod(0o600)

        # Atomic rename
        temp_file.replace(self.file_path)

        logger.info(
            "Saved OpenAI credentials to Codex auth file",
            file_path=str(self.file_path),
        )
        return True

    except Exception as e:
        logger.error(
            "Failed to save OpenAI credentials to Codex auth file",
            file_path=str(self.file_path),
            error=str(e),
        )
        # Clean up temp file if it exists
        temp_file = self.file_path.with_suffix(f"{self.file_path.suffix}.tmp")
        if temp_file.exists():
            with contextlib.suppress(Exception):
                temp_file.unlink()
        return False

exists async

exists()

Check if credentials file exists.

Source code in ccproxy/auth/openai/storage.py
async def exists(self) -> bool:
    """Check if credentials file exists."""
    if not self.file_path.exists():
        return False

    try:
        with self.file_path.open("r") as f:
            data = json.load(f)
        tokens = data.get("tokens", {})
        return bool(tokens.get("access_token"))
    except Exception:
        return False

delete async

delete()

Delete credentials file.

Source code in ccproxy/auth/openai/storage.py
async def delete(self) -> bool:
    """Delete credentials file."""
    try:
        if self.file_path.exists():
            self.file_path.unlink()
            logger.info("Deleted Codex auth file", file_path=str(self.file_path))
        return True
    except Exception as e:
        logger.error(
            "Failed to delete Codex auth file",
            file_path=str(self.file_path),
            error=str(e),
        )
        return False

get_location

get_location()

Get storage location description.

Source code in ccproxy/auth/openai/storage.py
def get_location(self) -> str:
    """Get storage location description."""
    return str(self.file_path)