Skip to content

ccproxy.plugins.access_log.writer

ccproxy.plugins.access_log.writer

AccessLogWriter

AccessLogWriter(
    log_file, buffer_size=100, flush_interval=1.0
)

Simple async file writer for access logs.

Features: - Async file I/O for performance - Optional buffering to reduce I/O operations - Thread-safe with asyncio.Lock - Auto-creates parent directories

Parameters:

Name Type Description Default
log_file str

Path to the log file

required
buffer_size int

Number of entries to buffer before writing

100
flush_interval float

Time in seconds between automatic flushes

1.0
Source code in ccproxy/plugins/access_log/writer.py
def __init__(
    self,
    log_file: str,
    buffer_size: int = 100,
    flush_interval: float = 1.0,
):
    """Initialize the writer.

    Args:
        log_file: Path to the log file
        buffer_size: Number of entries to buffer before writing
        flush_interval: Time in seconds between automatic flushes
    """
    self.log_file = Path(log_file)
    self.buffer_size = buffer_size
    self.flush_interval = flush_interval

    self._buffer: list[str] = []
    self._lock = asyncio.Lock()
    self._flush_task: asyncio.Task[None] | None = None
    self._last_flush = time.time()

    # Ensure parent directory exists
    self.log_file.parent.mkdir(parents=True, exist_ok=True)

write async

write(line)

Write a line to the log file.

Lines are buffered and written in batches for performance.

Parameters:

Name Type Description Default
line str

The formatted log line to write

required
Source code in ccproxy/plugins/access_log/writer.py
async def write(self, line: str) -> None:
    """Write a line to the log file.

    Lines are buffered and written in batches for performance.

    Args:
        line: The formatted log line to write
    """
    async with self._lock:
        self._buffer.append(line)

        # Flush if buffer is full
        if len(self._buffer) >= self.buffer_size:
            await self._flush()
        else:
            # Schedule a flush if not already scheduled
            self._schedule_flush()

close async

close()

Close the writer and flush any remaining data.

Source code in ccproxy/plugins/access_log/writer.py
async def close(self) -> None:
    """Close the writer and flush any remaining data."""
    async with self._lock:
        await self._flush()

    if self._flush_task and not self._flush_task.done():
        self._flush_task.cancel()