async def handle_request(
self, request: Request
) -> Response | StreamingResponse | DeferredStreaming:
"""Handle request with streaming detection and format chain support."""
# Get context from middleware (already initialized)
ctx = request.state.context
self._ensure_tool_accumulator(ctx)
# Step 1: Extract request data
body = await request.body()
body = await self._map_request_model(ctx, body)
headers = extract_request_headers(request)
method = request.method
endpoint = ctx.metadata.get("endpoint", "")
# Fail fast if a format chain is configured without a registry
self._ensure_format_registry(ctx.format_chain, endpoint)
# Extra debug breadcrumbs to confirm code path and detection inputs
logger.debug(
"http_adapter_handle_request_entry",
endpoint=endpoint,
method=method,
content_type=headers.get("content-type"),
has_streaming_handler=bool(self.streaming_handler),
category="stream_detection",
)
# Step 2: Early streaming detection
if self.streaming_handler:
logger.debug(
"checking_should_stream",
endpoint=endpoint,
has_streaming_handler=True,
content_type=headers.get("content-type"),
category="stream_detection",
)
# Detect streaming via Accept header and/or body flag stream:true
body_wants_stream = False
parsed_payload: dict[str, Any] | None = None
try:
parsed_payload = json.loads(body.decode()) if body else {}
body_wants_stream = bool(parsed_payload.get("stream", False))
except Exception:
body_wants_stream = False
header_wants_stream = self.streaming_handler.should_stream_response(headers)
logger.debug(
"should_stream_results",
body_wants_stream=body_wants_stream,
header_wants_stream=header_wants_stream,
endpoint=endpoint,
category="stream_detection",
)
if body_wants_stream or header_wants_stream:
logger.debug(
"streaming_request_detected",
endpoint=endpoint,
detected_via=(
"content_type_sse"
if header_wants_stream
else "body_stream_flag"
),
category="stream_detection",
)
if isinstance(parsed_payload, dict):
self._record_tool_definitions(ctx, parsed_payload)
return await self.handle_streaming(request, endpoint)
else:
logger.debug(
"not_streaming_request",
endpoint=endpoint,
category="stream_detection",
)
# Step 3: Execute format chain if specified (non-streaming)
request_payload: dict[str, Any] | None = None
if ctx.format_chain and len(ctx.format_chain) > 1:
try:
request_payload = self._decode_json_body(body, context="request")
except ValueError as exc:
logger.error(
"format_chain_request_parse_failed",
error=str(exc),
endpoint=endpoint,
category="transform",
)
return JSONResponse(
status_code=400,
content={
"error": {
"type": "invalid_request_error",
"message": "Failed to parse request body for format conversion",
"details": str(exc),
}
},
)
self._record_tool_definitions(ctx, request_payload)
try:
logger.debug(
"format_chain_request_about_to_convert",
chain=ctx.format_chain,
endpoint=endpoint,
category="transform",
)
request_payload = await self._apply_format_chain(
data=request_payload,
format_chain=ctx.format_chain,
stage="request",
)
body = self._encode_json_body(request_payload)
logger.trace(
"format_chain_request_converted",
from_format=ctx.format_chain[0],
to_format=ctx.format_chain[-1],
keys=list(request_payload.keys()),
size_bytes=len(body),
category="transform",
)
logger.info(
"format_chain_applied",
stage="request",
endpoint=endpoint,
chain=ctx.format_chain,
steps=len(ctx.format_chain) - 1,
category="format",
)
except Exception as e:
logger.error(
"format_chain_request_failed",
error=str(e),
endpoint=endpoint,
exc_info=e,
category="transform",
)
return JSONResponse(
status_code=400,
content={
"error": {
"type": "invalid_request_error",
"message": "Failed to convert request using format chain",
"details": str(e),
}
},
)
# Step 4: Provider-specific preparation
prepared_body, prepared_headers = await self.prepare_provider_request(
body, headers, endpoint
)
with contextlib.suppress(Exception):
logger.trace(
"provider_request_prepared",
endpoint=endpoint,
header_keys=list(prepared_headers.keys()),
body_size=len(prepared_body or b""),
category="http",
)
# Step 5: Execute HTTP request
target_url = await self.get_target_url(endpoint)
(
method,
target_url,
prepared_body,
prepared_headers,
) = await self._emit_provider_request_prepared(
request_obj=request,
ctx=ctx,
method=method,
endpoint=endpoint,
target_url=target_url,
prepared_body=prepared_body,
prepared_headers=prepared_headers,
is_streaming=False,
)
provider_response = await self._execute_http_request(
method,
target_url,
prepared_headers,
prepared_body,
)
logger.trace(
"provider_response_received",
status_code=getattr(provider_response, "status_code", None),
content_type=getattr(provider_response, "headers", {}).get(
"content-type", None
),
category="http",
)
# Step 6: Provider-specific response processing
response = await self.process_provider_response(provider_response, endpoint)
# filter out hop-by-hop headers
headers = filter_response_headers(dict(provider_response.headers))
# Step 7: Format the response
if isinstance(response, StreamingResponse):
logger.debug("process_provider_response_streaming")
return await self._convert_streaming_response(
response, ctx.format_chain, ctx
)
elif isinstance(response, Response):
logger.debug("process_provider_response")
response = self._restore_model_response(response, ctx)
# httpx has already decoded provider payloads, so strip encoding
# headers that no longer match the body we forward to clients.
for header in ("content-encoding", "transfer-encoding", "content-length"):
with contextlib.suppress(KeyError):
del response.headers[header]
if ctx.format_chain and len(ctx.format_chain) > 1:
stage: Literal["response", "error"] = (
"error" if provider_response.status_code >= 400 else "response"
)
try:
payload = self._decode_json_body(
cast(bytes, response.body), context=stage
)
except ValueError as exc:
logger.error(
"format_chain_response_parse_failed",
error=str(exc),
endpoint=endpoint,
stage=stage,
category="transform",
)
return response
try:
payload = await self._apply_format_chain(
data=payload,
format_chain=ctx.format_chain,
stage=stage,
)
metadata = getattr(ctx, "metadata", None)
if isinstance(metadata, dict):
alias_map = metadata.get("_model_alias_map")
else:
alias_map = None
if not alias_map:
alias_map = getattr(ctx, "_model_alias_map", None)
if isinstance(metadata, dict):
if (
isinstance(payload, dict)
and isinstance(alias_map, Mapping)
and isinstance(payload.get("model"), str)
):
payload["model"] = alias_map.get(
payload["model"], payload["model"]
)
restore_model_aliases(payload, metadata)
body_bytes = self._encode_json_body(payload)
logger.info(
"format_chain_applied",
stage=stage,
endpoint=endpoint,
chain=ctx.format_chain,
steps=len(ctx.format_chain) - 1,
category="format",
)
restored = Response(
content=body_bytes,
status_code=provider_response.status_code,
headers=headers,
media_type=provider_response.headers.get(
"content-type", "application/json"
),
)
return self._restore_model_response(restored, ctx)
except Exception as e:
logger.error(
"format_chain_response_failed",
error=str(e),
endpoint=endpoint,
stage=stage,
exc_info=e,
category="transform",
)
# Return proper error instead of potentially malformed response
return JSONResponse(
status_code=500,
content={
"error": {
"type": "internal_server_error",
"message": "Failed to convert response format",
"details": str(e),
}
},
)
else:
logger.debug("format_chain_skipped", reason="no forward chain")
return self._restore_model_response(response, ctx)
else:
logger.warning(
"unexpected_provider_response_type", type=type(response).__name__
)
restored = Response(
content=provider_response.content,
status_code=provider_response.status_code,
headers=headers,
media_type=headers.get("content-type", "application/json"),
)
return self._restore_model_response(restored, ctx)