Skip to content

ccproxy.plugins.duckdb_storage.storage

ccproxy.plugins.duckdb_storage.storage

Simplified DuckDB storage for low-traffic environments.

This module provides a simple, direct DuckDB storage implementation without connection pooling or batch processing. Suitable for dev environments with low request rates (< 10 req/s).

SimpleDuckDBStorage

SimpleDuckDBStorage(database_path='data/metrics.duckdb')

Simple DuckDB storage with queue-based writes to prevent deadlocks.

Parameters:

Name Type Description Default
database_path str | Path

Path to DuckDB database file

'data/metrics.duckdb'
Source code in ccproxy/plugins/duckdb_storage/storage.py
def __init__(self, database_path: str | Path = "data/metrics.duckdb"):
    """Initialize simple DuckDB storage.

    Args:
        database_path: Path to DuckDB database file
    """
    self.database_path = Path(database_path)
    self._engine: Engine | None = None
    self._initialized: bool = False
    self._write_queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue()
    self._background_worker_task: asyncio.Task[None] | None = None
    self._shutdown_event = asyncio.Event()
    # Sentinel to wake the background worker immediately on shutdown
    self._sentinel: object = object()

initialize async

initialize()

Initialize the storage backend.

Source code in ccproxy/plugins/duckdb_storage/storage.py
async def initialize(self) -> None:
    """Initialize the storage backend."""
    if self._initialized:
        return

    try:
        # Ensure data directory exists
        self.database_path.parent.mkdir(parents=True, exist_ok=True)

        # Create SQLModel engine
        self._engine = create_engine(f"duckdb:///{self.database_path}")

        # Create schema using SQLModel (synchronous in main thread)
        self._create_schema_sync()

        # Start background worker for queue processing
        self._background_worker_task = await create_managed_task(
            self._background_worker(),
            name="duckdb_background_worker",
            creator="SimpleDuckDBStorage",
        )

        self._initialized = True
        logger.debug(
            "simple_duckdb_initialized", database_path=str(self.database_path)
        )

    except OSError as e:
        logger.error("simple_duckdb_init_io_error", error=str(e), exc_info=e)
        raise
    except SQLAlchemyError as e:
        logger.error("simple_duckdb_init_db_error", error=str(e), exc_info=e)
        raise
    except Exception as e:
        logger.error("simple_duckdb_init_error", error=str(e), exc_info=e)
        raise

optimize

optimize()

Run PRAGMA optimize on the database engine if available.

This is a lightweight maintenance step to improve performance and reclaim space in DuckDB. Safe to call on file-backed databases.

Source code in ccproxy/plugins/duckdb_storage/storage.py
def optimize(self) -> None:
    """Run PRAGMA optimize on the database engine if available.

    This is a lightweight maintenance step to improve performance and
    reclaim space in DuckDB. Safe to call on file-backed databases.
    """
    if not self._engine:
        return
    try:
        with self._engine.connect() as conn:
            conn.exec_driver_sql("PRAGMA optimize")
            logger.debug("duckdb_optimize_completed")
    except Exception as e:  # pragma: no cover - non-critical maintenance
        logger.warning("duckdb_optimize_failed", error=str(e), exc_info=e)

store_request async

store_request(data)

Store a single request log entry asynchronously via queue.

Parameters:

Name Type Description Default
data Mapping[str, Any]

Request data to store

required

Returns:

Type Description
bool

True if queued successfully

Source code in ccproxy/plugins/duckdb_storage/storage.py
async def store_request(self, data: Mapping[str, Any]) -> bool:
    """Store a single request log entry asynchronously via queue.

    Args:
        data: Request data to store

    Returns:
        True if queued successfully
    """
    if not self._initialized:
        return False

    try:
        # Add to queue for background processing
        await self._write_queue.put(dict(data))
        return True
    except asyncio.QueueFull as e:
        logger.error(
            "queue_store_full_error",
            error=str(e),
            request_id=data.get("request_id"),
            exc_info=e,
        )
        return False
    except Exception as e:
        logger.error(
            "queue_store_error",
            error=str(e),
            request_id=data.get("request_id"),
            exc_info=e,
        )
        return False

store_batch async

store_batch(metrics)

Store a batch of request logs.

Parameters:

Name Type Description Default
metrics Sequence[dict[str, Any]]

List of metric data entries

required

Returns:

Type Description
bool

True if stored successfully

Source code in ccproxy/plugins/duckdb_storage/storage.py
async def store_batch(self, metrics: Sequence[dict[str, Any]]) -> bool:
    """Store a batch of request logs.

    Args:
        metrics: List of metric data entries

    Returns:
        True if stored successfully
    """
    if not self._initialized or not self._engine:
        return False

    try:
        rows = []
        for data in metrics:
            timestamp_value = data.get("timestamp", time.time())
            timestamp_dt = (
                datetime.fromtimestamp(timestamp_value)
                if isinstance(timestamp_value, int | float)
                else timestamp_value
            )
            rows.append(
                {
                    "request_id": data.get("request_id", ""),
                    "timestamp": timestamp_dt,
                    "method": data.get("method", ""),
                    "endpoint": data.get("endpoint", ""),
                    "path": data.get("path", data.get("endpoint", "")),
                    "query": data.get("query", ""),
                    "client_ip": data.get("client_ip", ""),
                    "user_agent": data.get("user_agent", ""),
                    "service_type": data.get("service_type", ""),
                    "provider": data.get("provider", ""),
                    "model": data.get("model", ""),
                    "streaming": data.get("streaming", False),
                    "status_code": data.get("status_code", 200),
                    "duration_ms": data.get("duration_ms", 0.0),
                    "duration_seconds": data.get("duration_seconds", 0.0),
                    "tokens_input": data.get("tokens_input", 0),
                    "tokens_output": data.get("tokens_output", 0),
                    "cache_read_tokens": data.get("cache_read_tokens", 0),
                    "cache_write_tokens": data.get("cache_write_tokens", 0),
                    "cost_usd": data.get("cost_usd", 0.0),
                    "cost_sdk_usd": data.get("cost_sdk_usd", 0.0),
                }
            )

        table = SQLModel.metadata.tables.get("access_logs")
        if table is None:
            raise RuntimeError(
                "access_logs table not registered; ensure analytics plugin is enabled"
            )
        with Session(self._engine) as session:
            cast(Any, session).exec(insert(table), rows)
            session.commit()

        logger.info(
            "simple_duckdb_batch_store_success",
            batch_size=len(metrics),
            service_types=[m.get("service_type", "") for m in metrics[:3]],
            request_ids=[m.get("request_id", "") for m in metrics[:3]],
        )
        return True

    except IntegrityError as e:
        logger.error(
            "simple_duckdb_store_batch_integrity_error",
            error=str(e),
            metric_count=len(metrics),
            exc_info=e,
        )
        return False
    except OperationalError as e:
        logger.error(
            "simple_duckdb_store_batch_operational_error",
            error=str(e),
            metric_count=len(metrics),
            exc_info=e,
        )
        return False
    except SQLAlchemyError as e:
        logger.error(
            "simple_duckdb_store_batch_db_error",
            error=str(e),
            metric_count=len(metrics),
            exc_info=e,
        )
        return False
    except Exception as e:
        logger.error(
            "simple_duckdb_store_batch_error",
            error=str(e),
            metric_count=len(metrics),
            exc_info=e,
        )
        return False

store async

store(metric)

Store single metric.

Parameters:

Name Type Description Default
metric dict[str, Any]

Metric data to store

required

Returns:

Type Description
bool

True if stored successfully

Source code in ccproxy/plugins/duckdb_storage/storage.py
async def store(self, metric: dict[str, Any]) -> bool:
    """Store single metric.

    Args:
        metric: Metric data to store

    Returns:
        True if stored successfully
    """
    return await self.store_batch([metric])

close async

close()

Close the database connection and stop background worker.

Source code in ccproxy/plugins/duckdb_storage/storage.py
async def close(self) -> None:
    """Close the database connection and stop background worker."""
    # Signal shutdown to background worker
    self._shutdown_event.set()

    # Wake up background worker immediately if it's waiting on queue.get()
    with contextlib.suppress(Exception):
        self._write_queue.put_nowait(self._sentinel)  # type: ignore[arg-type]

    # Wait for background worker to finish
    if self._background_worker_task:
        try:
            await asyncio.wait_for(self._background_worker_task, timeout=5.0)
        except TimeoutError:
            logger.warning("background_worker_shutdown_timeout")
            self._background_worker_task.cancel()
        except asyncio.CancelledError:
            logger.info("background_worker_shutdown_cancelled")
        except Exception as e:
            logger.error(
                "background_worker_shutdown_error", error=str(e), exc_info=e
            )

    # Process remaining items in queue (with timeout)
    try:
        await asyncio.wait_for(self._write_queue.join(), timeout=2.0)
    except TimeoutError:
        logger.warning(
            "queue_drain_timeout", remaining_items=self._write_queue.qsize()
        )

    if self._engine:
        try:
            self._engine.dispose()
        except SQLAlchemyError as e:
            logger.error(
                "simple_duckdb_engine_close_db_error", error=str(e), exc_info=e
            )
        except Exception as e:
            logger.error(
                "simple_duckdb_engine_close_error", error=str(e), exc_info=e
            )
        finally:
            self._engine = None

    self._initialized = False

is_enabled

is_enabled()

Check if storage is enabled and available.

Source code in ccproxy/plugins/duckdb_storage/storage.py
def is_enabled(self) -> bool:
    """Check if storage is enabled and available."""
    return self._initialized

health_check async

health_check()

Get health status of the storage backend.

Source code in ccproxy/plugins/duckdb_storage/storage.py
async def health_check(self) -> dict[str, Any]:
    """Get health status of the storage backend."""
    if not self._initialized:
        return {
            "status": "not_initialized",
            "enabled": False,
        }

    try:
        if self._engine:
            # Run the synchronous database operation in a thread pool
            access_log_count = await asyncio.to_thread(self._health_check_sync)

            return {
                "status": "healthy",
                "enabled": True,
                "database_path": str(self.database_path),
                "access_log_count": access_log_count,
                "backend": "sqlmodel",
            }
        else:
            return {
                "status": "no_connection",
                "enabled": False,
            }

    except SQLAlchemyError as e:
        return {
            "status": "unhealthy",
            "enabled": False,
            "error": str(e),
            "error_type": "database",
        }
    except Exception as e:
        return {
            "status": "unhealthy",
            "enabled": False,
            "error": str(e),
            "error_type": "unknown",
        }

reset_data async

reset_data()

Reset all data in the storage (useful for testing/debugging).

Returns:

Type Description
bool

True if reset was successful

Source code in ccproxy/plugins/duckdb_storage/storage.py
async def reset_data(self) -> bool:
    """Reset all data in the storage (useful for testing/debugging).

    Returns:
        True if reset was successful
    """
    if not self._initialized or not self._engine:
        return False

    try:
        # Run the reset operation in a thread pool
        return await asyncio.to_thread(self._reset_data_sync)
    except SQLAlchemyError as e:
        logger.error("simple_duckdb_reset_db_error", error=str(e), exc_info=e)
        return False
    except Exception as e:
        logger.error("simple_duckdb_reset_error", error=str(e), exc_info=e)
        return False

wait_for_queue_processing async

wait_for_queue_processing(timeout=5.0)

Wait for all queued items to be processed by the background worker.

Parameters:

Name Type Description Default
timeout float

Maximum time to wait in seconds

5.0

Raises:

Type Description
TimeoutError

If processing doesn't complete within timeout

Source code in ccproxy/plugins/duckdb_storage/storage.py
async def wait_for_queue_processing(self, timeout: float = 5.0) -> None:
    """Wait for all queued items to be processed by the background worker.

    Args:
        timeout: Maximum time to wait in seconds

    Raises:
        asyncio.TimeoutError: If processing doesn't complete within timeout
    """
    if not self._initialized or self._shutdown_event.is_set():
        return

    await asyncio.wait_for(self._write_queue.join(), timeout=timeout)