async def dispatch(self, request: Request, call_next: Any) -> Response:
"""Dispatch the request with hook emission.
Args:
request: The incoming request
call_next: The next middleware/handler in the chain
Returns:
The response from downstream handlers
"""
# Get hook manager from app state if not set during init
hook_manager = self.hook_manager
if not hook_manager and hasattr(request.app.state, "hook_manager"):
hook_manager = request.app.state.hook_manager
# Skip hook emission if no hook manager available
if not hook_manager:
return cast(Response, await call_next(request))
# Extract request_id from ASGI scope extensions
request_id = getattr(request.state, "request_id", None)
if not request_id:
# Fallback to headers or generate one
request_id = request.headers.get(
"X-Request-ID", f"req-{int(time.time() * 1000)}"
)
# Get or create RequestContext
from ccproxy.core.request_context import RequestContext
request_context = RequestContext.get_current()
if not request_context:
# Create minimal context if none exists
start_time_perf = time.perf_counter()
request_context = RequestContext(
request_id=request_id,
start_time=start_time_perf,
logger=logger,
)
# Wall-clock time for human-readable timestamps
start_time = time.time()
# Create hook context for the request
logger.debug("headers_on_request_start", headers=dict(request.headers))
hook_context = HookContext(
event=HookEvent.REQUEST_STARTED, # Will be overridden in emit calls
timestamp=datetime.fromtimestamp(start_time),
data={
"request_id": request_id,
"method": request.method,
"url": str(request.url),
# Extract headers using utility function
"headers": extract_request_headers(request),
},
metadata=getattr(request_context, "metadata", {}),
request=request,
)
try:
# Emit REQUEST_STARTED before processing
await hook_manager.emit_with_context(hook_context)
# Capture and emit HTTP_REQUEST hook with body
(
body_preview,
body_size,
body_truncated,
body_is_json,
) = await self._emit_http_request_hook(hook_manager, request, hook_context)
accept_header = request.headers.get("accept", "").lower()
if "text/event-stream" not in accept_header:
logger.info(
"request_started",
request_id=request_id,
method=request.method,
url=str(request.url),
has_body=body_preview is not None,
body_size=body_size,
body_truncated=body_truncated,
is_json=body_is_json,
origin="client",
streaming=False,
category="http",
)
# Process the request
response = cast(Response, await call_next(request))
# Update hook context with response information
end_time = time.time()
response_hook_context = HookContext(
event=HookEvent.REQUEST_COMPLETED, # Will be overridden in emit calls
timestamp=datetime.fromtimestamp(start_time),
data={
"request_id": request_id,
"method": request.method,
"url": str(request.url),
"headers": extract_request_headers(request),
"response_status": getattr(response, "status_code", 200),
# Response headers preserved via extract_response_headers
"response_headers": extract_response_headers(response),
"duration": end_time - start_time,
},
metadata=getattr(request_context, "metadata", {}),
request=request,
response=response,
)
# Handle streaming responses specially
# Check if it's a streaming response (including middleware wrapped streaming responses)
is_streaming = (
isinstance(response, StreamingResponse)
or type(response).__name__ == "_StreamingResponse"
)
logger.debug(
"hooks_middleware_checking_response_type",
response_type=type(response).__name__,
response_class=str(type(response)),
is_streaming=is_streaming,
request_id=request_id,
)
if is_streaming:
# For streaming responses, wrap with hook emission on completion
# Don't emit REQUEST_COMPLETED here - it will be emitted when streaming actually completes
logger.debug(
"hooks_middleware_wrapping_streaming_response",
request_id=request_id,
method=request.method,
url=str(request.url),
status_code=getattr(response, "status_code", 200),
duration=end_time - start_time,
response_type="streaming",
category="hooks",
)
# Wrap the streaming response to emit hooks on completion
request_data = {
"method": request.method,
"url": str(request.url),
"headers": extract_request_headers(request),
}
# Include RequestContext metadata if available
request_metadata: dict[str, Any] = {}
if request_context:
request_metadata = getattr(request_context, "metadata", {})
response_stream = cast(StreamingResponse, response)
is_sse = self._is_sse_response(response_stream)
if is_sse:
logger.info(
"sse_connection_started",
request_id=request_id,
method=request.method,
url=str(request.url),
origin="client",
streaming=True,
has_body=body_preview is not None,
body_size=body_size,
body_truncated=body_truncated,
is_json=body_is_json,
category="http",
)
# Coerce body iterator to AsyncGenerator[bytes]
async def _coerce_bytes() -> Any:
async for chunk in response_stream.body_iterator:
if isinstance(chunk, bytes):
yield chunk
elif isinstance(chunk, memoryview):
yield bytes(chunk)
else:
yield str(chunk).encode("utf-8", errors="replace")
wrapped_response = StreamingResponseWithHooks(
content=_coerce_bytes(),
hook_manager=hook_manager,
request_id=request_id,
request_data=request_data,
request_metadata=request_metadata,
start_time=start_time,
status_code=response_stream.status_code,
origin="client",
is_sse=is_sse,
headers=dict(response_stream.headers),
media_type=response_stream.media_type,
)
return wrapped_response
else:
# For regular responses, emit HTTP_RESPONSE and REQUEST_COMPLETED
await self._emit_http_response_hook(
hook_manager, request, response, hook_context
)
await hook_manager.emit_with_context(response_hook_context)
duration_ms = round((end_time - start_time) * 1000, 3)
logger.info(
"request_completed",
request_id=request_id,
method=request.method,
url=str(request.url),
status_code=getattr(response, "status_code", 200),
duration_ms=duration_ms,
origin="client",
streaming=False,
success=True,
category="http",
)
logger.debug(
"hooks_middleware_request_completed",
request_id=request_id,
method=request.method,
url=str(request.url),
status_code=getattr(response, "status_code", 200),
duration=end_time - start_time,
response_type="regular",
category="hooks",
)
return response
except Exception as e:
# Update hook context with error information
end_time = time.time()
error_hook_context = HookContext(
event=HookEvent.REQUEST_FAILED, # Will be overridden in emit calls
timestamp=datetime.fromtimestamp(start_time),
data={
"request_id": request_id,
"method": request.method,
"url": str(request.url),
"headers": extract_request_headers(request),
"duration": end_time - start_time,
},
metadata=getattr(request_context, "metadata", {}),
request=request,
error=e,
)
# Emit REQUEST_FAILED on error
try:
await hook_manager.emit_with_context(error_hook_context)
except Exception as hook_error:
logger.error(
"hooks_middleware_hook_emission_failed",
request_id=request_id,
original_error=str(e),
hook_error=str(hook_error),
category="hooks",
)
logger.debug(
"hooks_middleware_request_failed",
request_id=request_id,
method=request.method,
url=str(request.url),
error=str(e),
duration=end_time - start_time,
category="hooks",
)
duration_ms = round((end_time - start_time) * 1000, 3)
status_code = getattr(e, "status_code", None)
logger.info(
"request_completed",
request_id=request_id,
method=request.method,
url=str(request.url),
status_code=status_code,
duration_ms=duration_ms,
origin="client",
streaming=False,
success=False,
error_type=type(e).__name__,
category="http",
)
# Re-raise the original exception
raise