from __future__ import annotations
import itertools
import re
from typing import TYPE_CHECKING, Any, ClassVar, Generic, Iterable, Literal, Mapping, TypeVar, overload
from litestar.datastructures.cookie import Cookie
from litestar.datastructures.headers import ETag, MutableScopeHeaders
from litestar.enums import MediaType, OpenAPIMediaType
from litestar.exceptions import ImproperlyConfiguredException
from litestar.serialization import default_serializer, encode_json, encode_msgpack, get_serializer
from litestar.status_codes import HTTP_200_OK, HTTP_204_NO_CONTENT, HTTP_304_NOT_MODIFIED
from litestar.types.empty import Empty
from litestar.utils.deprecation import deprecated, warn_deprecation
from litestar.utils.helpers import get_enum_string_value
if TYPE_CHECKING:
from typing import Optional
from litestar.app import Litestar
from litestar.background_tasks import BackgroundTask, BackgroundTasks
from litestar.connection import Request
from litestar.types import (
HTTPResponseBodyEvent,
HTTPResponseStartEvent,
Receive,
ResponseCookies,
ResponseHeaders,
Scope,
Send,
Serializer,
TypeEncodersMap,
)
__all__ = ("ASGIResponse", "Response")
T = TypeVar("T")
MEDIA_TYPE_APPLICATION_JSON_PATTERN = re.compile(r"^application/(?:.+\+)?json")
class ASGIResponse:
"""A low-level ASGI response class."""
__slots__ = (
"background",
"body",
"content_length",
"encoding",
"is_head_response",
"status_code",
"_encoded_cookies",
"headers",
)
_should_set_content_length: ClassVar[bool] = True
"""A flag to indicate whether the content-length header should be set by default or not."""
def __init__(
self,
*,
background: BackgroundTask | BackgroundTasks | None = None,
body: bytes | str = b"",
content_length: int | None = None,
cookies: Iterable[Cookie] | None = None,
encoded_headers: Iterable[tuple[bytes, bytes]] | None = None,
encoding: str = "utf-8",
headers: dict[str, Any] | Iterable[tuple[str, str]] | None = None,
is_head_response: bool = False,
media_type: MediaType | str | None = None,
status_code: int | None = None,
) -> None:
"""A low-level ASGI response class.
Args:
background: A background task or a list of background tasks to be executed after the response is sent.
body: encoded content to send in the response body.
content_length: The response content length.
cookies: The response cookies.
encoded_headers: The response headers.
encoding: The response encoding.
headers: The response headers.
is_head_response: A boolean indicating if the response is a HEAD response.
media_type: The response media type.
status_code: The response status code.
"""
body = body.encode() if isinstance(body, str) else body
status_code = status_code or HTTP_200_OK
self.headers = MutableScopeHeaders()
if encoded_headers is not None:
warn_deprecation("3.0", kind="parameter", deprecated_name="encoded_headers", alternative="headers")
for header_name, header_value in encoded_headers:
self.headers.add(header_name.decode("latin-1"), header_value.decode("latin-1"))
if headers is not None:
for k, v in headers.items() if isinstance(headers, dict) else headers:
self.headers.add(k, v) # pyright: ignore
media_type = get_enum_string_value(media_type or MediaType.JSON)
status_allows_body = (
status_code not in {HTTP_204_NO_CONTENT, HTTP_304_NOT_MODIFIED} and status_code >= HTTP_200_OK
)
if content_length is None:
content_length = len(body)
if not status_allows_body or is_head_response:
if body and body != b"null":
raise ImproperlyConfiguredException(
"response content is not supported for HEAD responses and responses with a status code "
"that does not allow content (304, 204, < 200)"
)
body = b""
else:
self.headers.setdefault(
"content-type", (f"{media_type}; charset={encoding}" if media_type.startswith("text/") else media_type)
)
if self._should_set_content_length:
self.headers.setdefault("content-length", str(content_length))
self.background = background
self.body = body
self.content_length = content_length
self._encoded_cookies = tuple(
cookie.to_encoded_header() for cookie in (cookies or ()) if not cookie.documentation_only
)
self.encoding = encoding
self.is_head_response = is_head_response
self.status_code = status_code
@property
@deprecated("3.0", kind="property", alternative="encode_headers()")
def encoded_headers(self) -> list[tuple[bytes, bytes]]:
return self.encode_headers()
def encode_headers(self) -> list[tuple[bytes, bytes]]:
return [*self.headers.headers, *self._encoded_cookies]
async def after_response(self) -> None:
"""Execute after the response is sent.
Returns:
None
"""
if self.background is not None:
await self.background()
async def start_response(self, send: Send) -> None:
"""Emit the start event of the response. This event includes the headers and status codes.
Args:
send: The ASGI send function.
Returns:
None
"""
event: HTTPResponseStartEvent = {
"type": "http.response.start",
"status": self.status_code,
"headers": self.encode_headers(),
}
await send(event)
async def send_body(self, send: Send, receive: Receive) -> None:
"""Emit the response body.
Args:
send: The ASGI send function.
receive: The ASGI receive function.
Notes:
- Response subclasses should customize this method if there is a need to customize sending data.
Returns:
None
"""
event: HTTPResponseBodyEvent = {"type": "http.response.body", "body": self.body, "more_body": False}
await send(event)
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
"""ASGI callable of the ``Response``.
Args:
scope: The ASGI connection scope.
receive: The ASGI receive function.
send: The ASGI send function.
Returns:
None
"""
await self.start_response(send=send)
if self.is_head_response:
event: HTTPResponseBodyEvent = {"type": "http.response.body", "body": b"", "more_body": False}
await send(event)
else:
await self.send_body(send=send, receive=receive)
await self.after_response()
class Response(Generic[T]):
"""Base Litestar HTTP response class, used as the basis for all other response classes."""
__slots__ = (
"background",
"content",
"cookies",
"encoding",
"headers",
"media_type",
"status_code",
"response_type_encoders",
)
content: T
type_encoders: Optional[TypeEncodersMap] = None # noqa: UP007
def __init__(
self,
content: T,
*,
background: BackgroundTask | BackgroundTasks | None = None,
cookies: ResponseCookies | None = None,
encoding: str = "utf-8",
headers: ResponseHeaders | None = None,
media_type: MediaType | OpenAPIMediaType | str | None = None,
status_code: int | None = None,
type_encoders: TypeEncodersMap | None = None,
) -> None:
"""Initialize the response.
Args:
content: A value for the response body that will be rendered into bytes string.
status_code: An HTTP status code.
media_type: A value for the response ``Content-Type`` header.
background: A :class:`BackgroundTask <.background_tasks.BackgroundTask>` instance or
:class:`BackgroundTasks <.background_tasks.BackgroundTasks>` to execute after the response is finished.
Defaults to ``None``.
headers: A string keyed dictionary of response headers. Header keys are insensitive.
cookies: A list of :class:`Cookie <.datastructures.Cookie>` instances to be set under the response
``Set-Cookie`` header.
encoding: The encoding to be used for the response headers.
type_encoders: A mapping of types to callables that transform them into types supported for serialization.
"""
self.content = content
self.background = background
self.cookies: list[Cookie] = (
[Cookie(key=key, value=value) for key, value in cookies.items()]
if isinstance(cookies, Mapping)
else list(cookies or [])
)
self.encoding = encoding
self.headers: dict[str, Any] = (
dict(headers) if isinstance(headers, Mapping) else {h.name: h.value for h in headers or {}}
)
self.media_type = media_type
self.status_code = status_code
self.response_type_encoders = {**(self.type_encoders or {}), **(type_encoders or {})}
@overload
def set_cookie(self, /, cookie: Cookie) -> None: ...
@overload
def set_cookie(
self,
key: str,
value: str | None = None,
max_age: int | None = None,
expires: int | None = None,
path: str = "/",
domain: str | None = None,
secure: bool = False,
httponly: bool = False,
samesite: Literal["lax", "strict", "none"] = "lax",
) -> None: ...
def set_cookie( # type: ignore[misc]
self,
key: str | Cookie,
value: str | None = None,
max_age: int | None = None,
expires: int | None = None,
path: str = "/",
domain: str | None = None,
secure: bool = False,
httponly: bool = False,
samesite: Literal["lax", "strict", "none"] = "lax",
) -> None:
"""Set a cookie on the response. If passed a :class:`Cookie <.datastructures.Cookie>` instance, keyword
arguments will be ignored.
Args:
key: Key for the cookie or a :class:`Cookie <.datastructures.Cookie>` instance.
value: Value for the cookie, if none given defaults to empty string.
max_age: Maximal age of the cookie before its invalidated.
expires: Seconds from now until the cookie expires.
path: Path fragment that must exist in the request url for the cookie to be valid. Defaults to ``/``.
domain: Domain for which the cookie is valid.
secure: Https is required for the cookie.
httponly: Forbids javascript to access the cookie via ``document.cookie``.
samesite: Controls whether a cookie is sent with cross-site requests. Defaults to ``lax``.
Returns:
None.
"""
if not isinstance(key, Cookie):
key = Cookie(
domain=domain,
expires=expires,
httponly=httponly,
key=key,
max_age=max_age,
path=path,
samesite=samesite,
secure=secure,
value=value,
)
self.cookies.append(key)
def set_header(self, key: str, value: Any) -> None:
"""Set a header on the response.
Args:
key: Header key.
value: Header value.
Returns:
None.
"""
self.headers[key] = value
def set_etag(self, etag: str | ETag) -> None:
"""Set an etag header.
Args:
etag: An etag value.
Returns:
None
"""
self.headers["etag"] = etag.to_header() if isinstance(etag, ETag) else etag
def delete_cookie(
self,
key: str,
path: str = "/",
domain: str | None = None,
) -> None:
"""Delete a cookie.
Args:
key: Key of the cookie.
path: Path of the cookie.
domain: Domain of the cookie.
Returns:
None.
"""
cookie = Cookie(key=key, path=path, domain=domain, expires=0, max_age=0)
self.cookies = [c for c in self.cookies if c != cookie]
self.cookies.append(cookie)
def render(self, content: Any, media_type: str, enc_hook: Serializer = default_serializer) -> bytes:
"""Handle the rendering of content into a bytes string.
Returns:
An encoded bytes string
"""
if isinstance(content, bytes):
return content
if content is Empty:
raise RuntimeError("The `Empty` sentinel cannot be used as response content")
try:
if media_type.startswith("text/") and not content:
return b""
if isinstance(content, str):
return content.encode(self.encoding)
if media_type == MediaType.MESSAGEPACK:
return encode_msgpack(content, enc_hook)
if MEDIA_TYPE_APPLICATION_JSON_PATTERN.match(
media_type,
):
return encode_json(content, enc_hook)
raise ImproperlyConfiguredException(f"unsupported media_type {media_type} for content {content!r}")
except (AttributeError, ValueError, TypeError) as e:
raise ImproperlyConfiguredException("Unable to serialize response content") from e
def to_asgi_response(
self,
app: Litestar | None,
request: Request,
*,
background: BackgroundTask | BackgroundTasks | None = None,
cookies: Iterable[Cookie] | None = None,
encoded_headers: Iterable[tuple[bytes, bytes]] | None = None,
headers: dict[str, str] | None = None,
is_head_response: bool = False,
media_type: MediaType | str | None = None,
status_code: int | None = None,
type_encoders: TypeEncodersMap | None = None,
) -> ASGIResponse:
"""Create an ASGIResponse from a Response instance.
Args:
app: The :class:`Litestar <.app.Litestar>` application instance.
background: Background task(s) to be executed after the response is sent.
cookies: A list of cookies to be set on the response.
encoded_headers: A list of already encoded headers.
headers: Additional headers to be merged with the response headers. Response headers take precedence.
is_head_response: Whether the response is a HEAD response.
media_type: Media type for the response. If ``media_type`` is already set on the response, this is ignored.
request: The :class:`Request <.connection.Request>` instance.
status_code: Status code for the response. If ``status_code`` is already set on the response, this is
type_encoders: A dictionary of type encoders to use for encoding the response content.
Returns:
An ASGIResponse instance.
"""
if app is not None:
warn_deprecation(
version="2.1",
deprecated_name="app",
kind="parameter",
removal_in="3.0.0",
alternative="request.app",
)
headers = {**headers, **self.headers} if headers is not None else self.headers
cookies = self.cookies if cookies is None else itertools.chain(self.cookies, cookies)
if type_encoders:
type_encoders = {**type_encoders, **(self.response_type_encoders or {})}
else:
type_encoders = self.response_type_encoders
media_type = get_enum_string_value(self.media_type or media_type or MediaType.JSON)
return ASGIResponse(
background=self.background or background,
body=self.render(self.content, media_type, get_serializer(type_encoders)),
cookies=cookies,
encoded_headers=encoded_headers,
encoding=self.encoding,
headers=headers,
is_head_response=is_head_response,
media_type=media_type,
status_code=self.status_code or status_code,
)