import inspect
from collections import defaultdict
from collections.abc import Callable, Coroutine, Generator, Iterable, Mapping
from contextlib import contextmanager
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Generic, Literal, TypeGuard, TypeVar, cast, overload
from anyio.from_thread import BlockingPortal, start_blocking_portal
from litestar.exceptions import ImproperlyConfiguredException
from litestar.utils.empty import value_or_default
from litestar.utils.scope.state import ScopeState
from litestar_vite.inertia.types import ScrollPropsConfig
if TYPE_CHECKING:
from litestar.connection import ASGIConnection
from litestar_vite.inertia.plugin import InertiaPlugin
T = TypeVar("T")
PropKeyT = TypeVar("PropKeyT", bound=str)
StaticT = TypeVar("StaticT", bound=object)
DEFAULT_DEFERRED_GROUP = "default"
@overload
def lazy(key: str, value_or_callable: "None") -> "StaticProp[str, None]": ...
@overload
def lazy(key: str, value_or_callable: "T") -> "StaticProp[str, T]": ...
@overload
def lazy(key: str, value_or_callable: "Callable[..., None]" = ...) -> "DeferredProp[str, None]": ...
@overload
def lazy(
key: str, value_or_callable: "Callable[..., Coroutine[Any, Any, None]]" = ...
) -> "DeferredProp[str, None]": ...
@overload
def lazy(
key: str,
value_or_callable: "Callable[..., T | Coroutine[Any, Any, T]]" = ..., # pyright: ignore[reportInvalidTypeVarUse]
) -> "DeferredProp[str, T]": ...
[docs]
def lazy(
key: str,
value_or_callable: "T | Callable[..., Coroutine[Any, Any, None]] | Callable[..., T] | Callable[..., T | Coroutine[Any, Any, T]] | None" = None,
) -> "StaticProp[str, None] | StaticProp[str, T] | DeferredProp[str, T] | DeferredProp[str, None]":
"""Create a lazy prop only included during partial reloads.
Lazy props are excluded from the initial page load and only sent when
explicitly requested via partial reload (X-Inertia-Partial-Data header).
This optimizes initial page load by deferring non-critical data.
There are two use cases for lazy():
**1. Static Value (bandwidth optimization)**:
The value is computed eagerly but only sent during partial reloads.
Use when the value is cheap to compute but you want to reduce initial payload.
>>> lazy("user_count", len(users))
**2. Callable (bandwidth + CPU optimization)**:
The callable is only invoked during partial reloads.
Use when the value is expensive to compute.
>>> lazy("permissions", lambda: Permission.all())
.. warning:: **False Lazy Pitfall**
Be careful not to accidentally call the function when passing it.
Wrong::
lazy("data", expensive_fn())
Correct::
lazy("data", expensive_fn)
This is a Python evaluation order issue, not a framework limitation.
Args:
key: The key to store the value under in the props dict.
value_or_callable: Either a static value (computed eagerly, sent lazily)
or a callable (computed and sent lazily). If None, creates a lazy
prop with None value.
Returns:
StaticProp if value_or_callable is not callable, DeferredProp otherwise.
Example::
from litestar_vite.inertia import lazy, InertiaResponse
@get("/dashboard", component="Dashboard")
async def dashboard() -> InertiaResponse:
props = {
"user": current_user,
"user_count": lazy("user_count", 42),
"permissions": lazy("permissions", lambda: Permission.all()),
"notifications": lazy("notifications", fetch_notifications),
}
return InertiaResponse(props)
See Also:
- :func:`defer`: For v2 grouped deferred props loaded after page render
- Inertia.js partial reloads: https://inertiajs.com/partial-reloads
"""
if value_or_callable is None:
return StaticProp[str, None](key=key, value=None)
if not callable(value_or_callable):
return StaticProp[str, T](key=key, value=value_or_callable)
return DeferredProp[str, T](key=key, value=cast("Callable[..., T | Coroutine[Any, Any, T]]", value_or_callable))
[docs]
def defer(
key: str, callback: "Callable[..., T | Coroutine[Any, Any, T]]", group: str = DEFAULT_DEFERRED_GROUP
) -> "DeferredProp[str, T]":
"""Create a deferred prop with optional grouping (v2 feature).
Deferred props are loaded lazily after the initial page render.
Props in the same group are fetched together in a single request.
Args:
key: The key to store the value under.
callback: A callable (sync or async) that returns the value.
group: The group name for batched loading. Defaults to "default".
Returns:
A DeferredProp instance.
Example::
defer("permissions", lambda: Permission.all())
defer("teams", lambda: Team.all(), group="attributes")
defer("projects", lambda: Project.all(), group="attributes")
"""
return DeferredProp[str, T](key=key, value=callback, group=group)
[docs]
@dataclass
class PropFilter:
"""Configuration for prop filtering during partial reloads.
Used with ``only()`` and ``except_()`` helpers to explicitly control
which props are sent during partial reload requests.
Attributes:
include: Set of prop keys to include (only send these).
exclude: Set of prop keys to exclude (send all except these).
"""
include: "set[str] | None" = None
exclude: "set[str] | None" = None
[docs]
def should_include(self, key: str) -> bool:
"""Return True when a prop key should be included.
Returns:
True if the prop key should be included, otherwise False.
"""
if self.exclude is not None:
return key not in self.exclude
if self.include is not None:
return key in self.include
return True
[docs]
def only(*keys: str) -> PropFilter:
"""Create a filter that only includes the specified prop keys.
Use this to explicitly limit which props are sent during partial reloads.
Only the specified props will be included in the response.
Args:
*keys: The prop keys to include.
Returns:
A PropFilter configured to include only the specified keys.
Example::
from litestar_vite.inertia import only, InertiaResponse
@get("/users", component="Users")
async def list_users(
request: InertiaRequest,
user_service: UserService,
) -> InertiaResponse:
return InertiaResponse(
{
"users": user_service.list(),
"teams": team_service.list(),
"stats": stats_service.get(),
},
prop_filter=only("users"),
)
Note:
This is a server-side helper. The client should use Inertia's
``router.reload({ only: ['users'] })`` for client-initiated filtering.
"""
return PropFilter(include=set(keys))
[docs]
def except_(*keys: str) -> PropFilter:
"""Create a filter that excludes the specified prop keys.
Use this to explicitly exclude certain props during partial reloads.
All props except the specified ones will be included in the response.
Args:
*keys: The prop keys to exclude.
Returns:
A PropFilter configured to exclude the specified keys.
Example::
from litestar_vite.inertia import except_, InertiaResponse
@get("/users", component="Users")
async def list_users(
request: InertiaRequest,
user_service: UserService,
) -> InertiaResponse:
return InertiaResponse(
{
"users": user_service.list(),
"teams": team_service.list(),
"stats": expensive_stats(),
},
prop_filter=except_("stats"),
)
Note:
The function is named ``except_`` with a trailing underscore to avoid
conflicting with Python's ``except`` keyword.
"""
return PropFilter(exclude=set(keys))
[docs]
class MergeProp(Generic[PropKeyT, T]):
"""A wrapper for merge prop configuration (v2 feature).
Merge props allow data to be combined with existing props during
partial reloads instead of replacing them entirely.
"""
[docs]
def __init__(
self,
key: "PropKeyT",
value: "T",
strategy: "Literal['append', 'prepend', 'deep']" = "append",
match_on: "str | list[str] | None" = None,
) -> None:
"""Initialize a MergeProp.
Args:
key: The prop key.
value: The value to merge.
strategy: The merge strategy - 'append', 'prepend', or 'deep'.
match_on: Optional key(s) to match items on during merge.
"""
self._key = key
self._value = value
self._strategy = strategy
self._match_on = [match_on] if isinstance(match_on, str) else match_on
@property
def key(self) -> "PropKeyT":
return self._key
@property
def value(self) -> "T":
return self._value
@property
def strategy(self) -> "Literal['append', 'prepend', 'deep']":
return self._strategy # pyright: ignore[reportReturnType]
@property
def match_on(self) -> "list[str] | None":
return self._match_on
[docs]
def merge(
key: str,
value: "T",
strategy: "Literal['append', 'prepend', 'deep']" = "append",
match_on: "str | list[str] | None" = None,
) -> "MergeProp[str, T]":
"""Create a merge prop for combining data during partial reloads (v2 feature).
Merge props allow new data to be combined with existing props rather than
replacing them entirely. This is useful for infinite scroll, load more buttons,
and similar patterns.
Note: Prop merging only works during partial reloads. Full page visits
will always replace props entirely.
Args:
key: The prop key.
value: The value to merge.
strategy: How to merge the data:
- 'append': Add new items to the end (default)
- 'prepend': Add new items to the beginning
- 'deep': Recursively merge nested objects
match_on: Optional key(s) to match items on during merge,
useful for updating existing items instead of duplicating.
Returns:
A MergeProp instance.
Example::
merge("posts", new_posts)
merge("messages", new_messages, strategy="prepend")
merge("user_data", updates, strategy="deep")
merge("posts", updated_posts, match_on="id")
"""
return MergeProp[str, T](key=key, value=value, strategy=strategy, match_on=match_on)
[docs]
def is_merge_prop(value: "Any") -> "TypeGuard[MergeProp[Any, Any]]":
"""Check if value is a MergeProp.
Args:
value: Any value to check
Returns:
True if value is a MergeProp
"""
return isinstance(value, MergeProp)
[docs]
class StaticProp(Generic[PropKeyT, StaticT]):
"""A wrapper for static property evaluation."""
[docs]
def __init__(self, key: "PropKeyT", value: "StaticT") -> None:
self._key = key
self._result = value
@property
def key(self) -> "PropKeyT":
return self._key
def render(self, portal: "BlockingPortal | None" = None) -> "StaticT": # pyright: ignore
return self._result
[docs]
class DeferredProp(Generic[PropKeyT, T]):
"""A wrapper for deferred property evaluation."""
[docs]
def __init__(
self,
key: "PropKeyT",
value: "Callable[..., T | Coroutine[Any, Any, T] | None] | None" = None,
group: str = DEFAULT_DEFERRED_GROUP,
) -> None:
self._key = key
self._value = value
self._group = group
self._evaluated = False
self._result: "T | None" = None
@property
def group(self) -> str:
"""The deferred group this prop belongs to.
Returns:
The deferred group name.
"""
return self._group
@property
def key(self) -> "PropKeyT":
return self._key
@staticmethod
@contextmanager
def with_portal(portal: "BlockingPortal | None" = None) -> "Generator[BlockingPortal, None, None]":
if portal is None:
with start_blocking_portal() as p:
yield p
else:
yield portal
@staticmethod
def _is_awaitable(v: "Callable[..., T | Coroutine[Any, Any, T]]") -> "TypeGuard[Coroutine[Any, Any, T]]":
return inspect.iscoroutinefunction(v)
def render(self, portal: "BlockingPortal | None" = None) -> "T | None":
if self._evaluated:
return self._result
if self._value is None or not callable(self._value):
self._result = self._value
self._evaluated = True
return self._result
if not self._is_awaitable(cast("Callable[..., T]", self._value)):
self._result = cast("T", self._value())
self._evaluated = True
return self._result
with self.with_portal(portal) as p:
self._result = p.call(cast("Callable[..., T]", self._value))
self._evaluated = True
return self._result
[docs]
def is_lazy_prop(value: "Any") -> "TypeGuard[DeferredProp[Any, Any] | StaticProp[Any, Any]]":
"""Check if value is a deferred property.
Args:
value: Any value to check
Returns:
True if value is a deferred property
"""
return isinstance(value, (DeferredProp, StaticProp))
[docs]
def is_deferred_prop(value: "Any") -> "TypeGuard[DeferredProp[Any, Any]]":
"""Check if value is specifically a DeferredProp (not StaticProp).
Args:
value: Any value to check
Returns:
True if value is a DeferredProp
"""
return isinstance(value, DeferredProp)
[docs]
def should_render(
value: "Any",
partial_data: "set[str] | None" = None,
partial_except: "set[str] | None" = None,
key: "str | None" = None,
) -> "bool":
"""Check if value should be rendered based on partial reload filtering.
For v2 protocol, partial_except takes precedence over partial_data.
When a key is provided, filtering applies to all props (not just lazy props).
Args:
value: Any value to check
partial_data: Optional set of keys to include (X-Inertia-Partial-Data)
partial_except: Optional set of keys to exclude (X-Inertia-Partial-Except, v2)
key: Optional key name for this prop (enables key-based filtering for all props)
Returns:
bool: True if value should be rendered
"""
if is_lazy_prop(value):
if partial_except:
return value.key not in partial_except
if partial_data:
return value.key in partial_data
return False
if key is not None:
if partial_except:
return key not in partial_except
if partial_data:
return key in partial_data
return True
[docs]
def is_or_contains_lazy_prop(value: "Any") -> "bool":
"""Check if value is or contains a deferred property.
Args:
value: Any value to check
Returns:
True if value is or contains a deferred property
"""
if is_lazy_prop(value):
return True
if isinstance(value, str):
return False
if isinstance(value, Mapping):
return any(is_or_contains_lazy_prop(v) for v in cast("Mapping[str, Any]", value).values())
if isinstance(value, Iterable):
return any(is_or_contains_lazy_prop(v) for v in cast("Iterable[Any]", value))
return False
[docs]
def lazy_render(
value: "T",
partial_data: "set[str] | None" = None,
portal: "BlockingPortal | None" = None,
partial_except: "set[str] | None" = None,
) -> "T":
"""Filter deferred properties from the value based on partial data.
For v2 protocol, partial_except takes precedence over partial_data.
Args:
value: The value to filter
partial_data: Keys to include (X-Inertia-Partial-Data)
portal: Optional portal to use for async rendering
partial_except: Keys to exclude (X-Inertia-Partial-Except, v2)
Returns:
The filtered value
"""
if isinstance(value, str):
return cast("T", value)
if isinstance(value, Mapping):
return cast(
"T",
{
k: lazy_render(v, partial_data, portal, partial_except)
for k, v in cast("Mapping[str, Any]", value).items()
if should_render(v, partial_data, partial_except)
},
)
if isinstance(value, list):
return cast(
"T",
[
lazy_render(v, partial_data, portal, partial_except)
for v in cast("Iterable[Any]", value)
if should_render(v, partial_data, partial_except)
],
)
if isinstance(value, tuple):
return cast(
"T",
tuple(
lazy_render(v, partial_data, portal, partial_except)
for v in cast("Iterable[Any]", value)
if should_render(v, partial_data, partial_except)
),
)
if is_lazy_prop(value) and should_render(value, partial_data, partial_except):
return cast("T", value.render(portal))
return cast("T", value)
[docs]
def get_shared_props(
request: "ASGIConnection[Any, Any, Any, Any]",
partial_data: "set[str] | None" = None,
partial_except: "set[str] | None" = None,
) -> "dict[str, Any]":
"""Return shared session props for a request.
For v2 protocol, partial_except takes precedence over partial_data.
Args:
request: The ASGI connection.
partial_data: Optional set of keys to include (X-Inertia-Partial-Data).
partial_except: Optional set of keys to exclude (X-Inertia-Partial-Except, v2).
Returns:
The shared props.
Note:
Be sure to call this before `self.create_template_context` if you would like to include the `flash` message details.
"""
props: "dict[str, Any]" = {}
flash: "dict[str, list[str]]" = defaultdict(list)
errors: "dict[str, Any]" = {}
error_bag = request.headers.get("X-Inertia-Error-Bag", None)
try:
errors = request.session.pop("_errors", {})
shared_props = cast("dict[str,Any]", request.session.pop("_shared", {}))
inertia_plugin = cast("InertiaPlugin", request.app.plugins.get("InertiaPlugin"))
for key, value in shared_props.items():
if not should_render(value, partial_data, partial_except, key=key):
continue
if is_lazy_prop(value):
props[key] = value.render(inertia_plugin.portal)
else:
props[key] = value
for message in cast("list[dict[str,Any]]", request.session.pop("_messages", [])):
flash[message["category"]].append(message["message"])
for key, value in inertia_plugin.config.extra_static_page_props.items():
if should_render(value, partial_data, partial_except, key=key):
props[key] = value
for session_prop in inertia_plugin.config.extra_session_page_props:
if (
session_prop not in props
and session_prop in request.session
and should_render(None, partial_data, partial_except, key=session_prop)
):
props[session_prop] = request.session.get(session_prop)
except (AttributeError, ImproperlyConfiguredException):
msg = "Unable to generate all shared props. A valid session was not found for this request."
request.logger.warning(msg)
props["flash"] = flash
props["errors"] = {error_bag: errors} if error_bag is not None else errors
props["csrf_token"] = value_or_default(ScopeState.from_scope(request.scope).csrf_token, "")
return props
[docs]
def share(connection: "ASGIConnection[Any, Any, Any, Any]", key: "str", value: "Any") -> "None":
"""Share a value in the session.
Args:
connection: The ASGI connection.
key: The key to store the value under.
value: The value to store.
"""
try:
connection.session.setdefault("_shared", {}).update({key: value})
except (AttributeError, ImproperlyConfiguredException):
msg = "Unable to set `share` session state. A valid session was not found for this request."
connection.logger.warning(msg)
[docs]
def error(connection: "ASGIConnection[Any, Any, Any, Any]", key: "str", message: "str") -> "None":
"""Set an error message in the session.
Args:
connection: The ASGI connection.
key: The key to store the error under.
message: The error message.
"""
try:
connection.session.setdefault("_errors", {}).update({key: message})
except (AttributeError, ImproperlyConfiguredException):
msg = "Unable to set `error` session state. A valid session was not found for this request."
connection.logger.warning(msg)
[docs]
def flash(connection: "ASGIConnection[Any, Any, Any, Any]", message: "str", category: "str" = "info") -> "None":
"""Add a flash message to the session.
Flash messages are stored in the session and passed to the frontend
via the `flash` prop in every Inertia response. They're automatically
cleared after being displayed (pop semantics).
This function works without requiring Litestar's FlashPlugin or
any Jinja2 template configuration, making it ideal for SPA-only
Inertia applications.
Args:
connection: The ASGI connection (Request or WebSocket).
message: The message text to display.
category: The message category (e.g., "success", "error", "warning", "info").
Defaults to "info".
Example::
from litestar_vite.inertia import flash
@post("/create")
async def create_item(request: Request) -> InertiaResponse:
flash(request, "Item created successfully!", "success")
return InertiaResponse(...)
"""
try:
messages = connection.session.setdefault("_messages", [])
messages.append({"category": category, "message": message})
except (AttributeError, ImproperlyConfiguredException):
msg = "Unable to set flash message. A valid session was not found for this request."
connection.logger.warning(msg)
[docs]
def clear_history(connection: "ASGIConnection[Any, Any, Any, Any]") -> None:
"""Mark that the next response should clear client history encryption keys.
This function sets a session flag that will be consumed by the next
InertiaResponse, causing it to include `clearHistory: true` in the page
object. The Inertia client will then regenerate its encryption key,
invalidating all previously encrypted history entries.
This should typically be called during logout to ensure sensitive data
cannot be recovered from browser history after a user logs out.
Args:
connection: The ASGI connection (Request).
Note:
Requires session middleware to be configured.
See: https://inertiajs.com/history-encryption
Example::
from litestar_vite.inertia import clear_history
@post("/logout")
async def logout(request: Request) -> InertiaRedirect:
request.session.clear()
clear_history(request)
return InertiaRedirect(request, redirect_to="/login")
"""
try:
connection.session["_inertia_clear_history"] = True
except (AttributeError, ImproperlyConfiguredException):
msg = "Unable to set clear_history flag. A valid session was not found for this request."
connection.logger.warning(msg)
PAGINATION_ATTRS: tuple[tuple[str, str], ...] = (
("total", "total"),
("limit", "limit"),
("offset", "offset"),
("page_size", "pageSize"),
("current_page", "currentPage"),
("total_pages", "totalPages"),
("per_page", "perPage"),
("last_page", "lastPage"),
("has_more", "hasMore"),
("has_next", "hasNext"),
("has_previous", "hasPrevious"),
("next_cursor", "nextCursor"),
("previous_cursor", "previousCursor"),
)
def _has_offset_pagination_attrs(value: Any) -> bool:
try:
_ = value.limit
_ = value.offset
_ = value.total
except AttributeError:
return False
return True
def _has_classic_pagination_attrs(value: Any) -> bool:
try:
_ = value.current_page
_ = value.total_pages
except AttributeError:
return False
return True
def _extract_offset_pagination_meta(value: Any) -> tuple[int, int | None, int | None] | None:
try:
limit = value.limit
offset = value.offset
total = value.total
except AttributeError:
return None
if not (isinstance(limit, int) and isinstance(offset, int) and isinstance(total, int)):
return None
if limit > 0:
current_page = (offset // limit) + 1
total_pages = (total + limit - 1) // limit
else:
current_page = 1
total_pages = 1
previous_page = current_page - 1 if current_page > 1 else None
next_page = current_page + 1 if current_page < total_pages else None
return current_page, previous_page, next_page
def _extract_classic_pagination_meta(value: Any) -> tuple[int, int | None, int | None] | None:
try:
current_page = value.current_page
total_pages = value.total_pages
except AttributeError:
return None
if not (isinstance(current_page, int) and isinstance(total_pages, int)):
return None
previous_page = current_page - 1 if current_page > 1 else None
next_page = current_page + 1 if current_page < total_pages else None
return current_page, previous_page, next_page