Working with Controllers and Repositories

We’ve been working our way up the stack, starting with the database models, and now we are ready to use the repository in an actual route. Let’s see how we can use this in a controller.

Tip

The full code for this tutorial can be found below in the Full Code section.

First, we create a simple function that returns an instance of AuthorRepository. This function will be used to inject a repository instance into our controller routes. Note that we are only passing in the database session in this example with no other parameters.

app.py
1    model_type = AuthorModel

Because we’ll be using the SQLAlchemy plugin in Litestar, the session is automatically configured as a dependency.

By default, the repository doesn’t add any additional query options to your base statement, but provides the flexibility to override it, allowing you to pass your own statement:

app.py
1    return AuthorRepository(session=db_session)
2
3
4# we can optionally override the default `select` used for the repository to pass in
5# specific SQL options such as join details
6async def provide_author_details_repo(db_session: AsyncSession) -> AuthorRepository:
7    """This provides a simple example demonstrating how to override the join options for the repository."""
8    return AuthorRepository(

In this instance, we enhance the repository function by adding a selectinload option. This option configures the specified relationship to load via SELECT .. IN … loading pattern, optimizing the query execution.

Next, we define the AuthorController. This controller exposes five routes for interacting with the Author model:

AuthorController (click to toggle)
app.py
 1    return filters.LimitOffset(page_size, page_size * (current_page - 1))
 2
 3
 4class AuthorController(Controller):
 5    """Author CRUD"""
 6
 7    dependencies = {"authors_repo": Provide(provide_authors_repo)}
 8
 9    @get(path="/authors")
10    async def list_authors(
11        self,
12        authors_repo: AuthorRepository,
13        limit_offset: filters.LimitOffset,
14    ) -> service.OffsetPagination[Author]:
15        """List authors."""
16        results, total = await authors_repo.list_and_count(limit_offset)
17        type_adapter = TypeAdapter(list[Author])
18        return service.OffsetPagination[Author](
19            items=type_adapter.validate_python(results),
20            total=total,
21            limit=limit_offset.limit,
22            offset=limit_offset.offset,
23        )
24
25    @post(path="/authors")
26    async def create_author(
27        self,
28        authors_repo: AuthorRepository,
29        data: AuthorCreate,
30    ) -> Author:
31        """Create a new author."""
32        obj = await authors_repo.add(
33            AuthorModel(**data.model_dump(exclude_unset=True, exclude_none=True)),
34        )
35        await authors_repo.session.commit()
36        return Author.model_validate(obj)
37
38    # we override the authors_repo to use the version that joins the Books in
39    @get(path="/authors/{author_id:uuid}", dependencies={"authors_repo": Provide(provide_author_details_repo)})
40    async def get_author(
41        self,
42        authors_repo: AuthorRepository,
43        author_id: UUID = Parameter(
44            title="Author ID",
45            description="The author to retrieve.",
46        ),
47    ) -> Author:
48        """Get an existing author."""
49        obj = await authors_repo.get(author_id)
50        return Author.model_validate(obj)
51
52    @patch(
53        path="/authors/{author_id:uuid}",
54        dependencies={"authors_repo": Provide(provide_author_details_repo)},
55    )
56    async def update_author(
57        self,
58        authors_repo: AuthorRepository,
59        data: AuthorUpdate,
60        author_id: UUID = Parameter(
61            title="Author ID",
62            description="The author to update.",
63        ),
64    ) -> Author:
65        """Update an author."""
66        raw_obj = data.model_dump(exclude_unset=True, exclude_none=True)
67        raw_obj.update({"id": author_id})
68        obj = await authors_repo.update(AuthorModel(**raw_obj))
69        await authors_repo.session.commit()
70        return Author.model_validate(obj)
71
72    @delete(path="/authors/{author_id:uuid}")
73    async def delete_author(
74        self,
75        authors_repo: AuthorRepository,
76        author_id: UUID = Parameter(
77            title="Author ID",
78            description="The author to delete.",
79        ),

In our list detail endpoint, we use the pagination filter for limiting the amount of data returned, allowing us to retrieve large datasets in smaller, more manageable chunks.

In the above examples, we’ve used the asynchronous repository implementation. However, Litestar also supports synchronous database drivers with an identical implementation. Here’s a corresponding synchronous version of the previous example:

Synchronous Repository (click to toggle)
app.py
  1from __future__ import annotations
  2
  3from datetime import date
  4from typing import TYPE_CHECKING
  5from uuid import UUID
  6
  7from pydantic import BaseModel as _BaseModel
  8from pydantic import TypeAdapter
  9from sqlalchemy import ForeignKey
 10from sqlalchemy.orm import Mapped, mapped_column, relationship
 11
 12from litestar import Litestar, delete, get, patch, post
 13from litestar.controller import Controller
 14from litestar.di import Provide
 15from litestar.params import Parameter
 16from litestar.plugins.sqlalchemy import SQLAlchemyPlugin, SQLAlchemySyncConfig
 17from litestar.plugins.sqlalchemy.base import UUIDAuditBase, UUIDBase
 18from litestar.plugins.sqlalchemy.filters import LimitOffset
 19from litestar.plugins.sqlalchemy.repository import SQLAlchemySyncRepository
 20from litestar.plugins.sqlalchemy.service import OffsetPagination
 21
 22if TYPE_CHECKING:
 23    from sqlalchemy.orm import Session
 24
 25
 26class BaseModel(_BaseModel):
 27    """Extend Pydantic's BaseModel to enable ORM mode"""
 28
 29    model_config = {"from_attributes": True}
 30
 31
 32# The SQLAlchemy base includes a declarative model for you to use in your models.
 33# The `UUIDBase` class includes a `UUID` based primary key (`id`)
 34class AuthorModel(UUIDBase):
 35    # we can optionally provide the table name instead of auto-generating it
 36    __tablename__ = "author"
 37    name: Mapped[str]
 38    dob: Mapped[date | None]
 39    books: Mapped[list[BookModel]] = relationship(back_populates="author", lazy="noload")
 40
 41
 42# The `UUIDAuditBase` class includes the same UUID` based primary key (`id`) and 2
 43# additional columns: `created_at` and `updated_at`. `created_at` is a timestamp of when the
 44# record created, and `updated_at` is the last time the record was modified.
 45class BookModel(UUIDAuditBase):
 46    __tablename__ = "book"
 47    title: Mapped[str]
 48    author_id: Mapped[UUID] = mapped_column(ForeignKey("author.id"))
 49    author: Mapped[AuthorModel] = relationship(lazy="joined", innerjoin=True, viewonly=True)
 50
 51
 52# we will explicitly define the schema instead of using DTO objects for clarity.
 53
 54
 55class Author(BaseModel):
 56    id: UUID | None
 57    name: str
 58    dob: date | None = None
 59
 60
 61class AuthorCreate(BaseModel):
 62    name: str
 63    dob: date | None = None
 64
 65
 66class AuthorUpdate(BaseModel):
 67    name: str | None = None
 68    dob: date | None = None
 69
 70
 71class AuthorRepository(SQLAlchemySyncRepository[AuthorModel]):
 72    """Author repository."""
 73
 74    model_type = AuthorModel
 75
 76
 77async def provide_authors_repo(db_session: Session) -> AuthorRepository:
 78    """This provides the default Authors repository."""
 79    return AuthorRepository(session=db_session)
 80
 81
 82# we can optionally override the default `select` used for the repository to pass in
 83# specific SQL options such as join details
 84async def provide_author_details_repo(db_session: Session) -> AuthorRepository:
 85    """This provides a simple example demonstrating how to override the join options
 86    for the repository."""
 87    return AuthorRepository(
 88        session=db_session,
 89        load=[AuthorModel.books],
 90    )
 91
 92
 93def provide_limit_offset_pagination(
 94    current_page: int = Parameter(ge=1, query="currentPage", default=1, required=False),
 95    page_size: int = Parameter(
 96        query="pageSize",
 97        ge=1,
 98        default=10,
 99        required=False,
100    ),
101) -> LimitOffset:
102    """Add offset/limit pagination.
103
104    Return type consumed by `Repository.apply_limit_offset_pagination()`.
105
106    Parameters
107    ----------
108    current_page : int
109        LIMIT to apply to select.
110    page_size : int
111        OFFSET to apply to select.
112    """
113    return LimitOffset(page_size, page_size * (current_page - 1))
114
115
116class AuthorController(Controller):
117    """Author CRUD"""
118
119    dependencies = {"authors_repo": Provide(provide_authors_repo, sync_to_thread=False)}
120
121    @get(path="/authors")
122    def list_authors(
123        self,
124        authors_repo: AuthorRepository,
125        limit_offset: LimitOffset,
126    ) -> OffsetPagination[Author]:
127        """List authors."""
128        results, total = authors_repo.list_and_count(limit_offset)
129        type_adapter = TypeAdapter(list[Author])
130        return OffsetPagination[Author](
131            items=type_adapter.validate_python(results),
132            total=total,
133            limit=limit_offset.limit,
134            offset=limit_offset.offset,
135        )
136
137    @post(path="/authors")
138    def create_author(
139        self,
140        authors_repo: AuthorRepository,
141        data: AuthorCreate,
142    ) -> Author:
143        """Create a new author."""
144        obj = authors_repo.add(
145            AuthorModel(**data.model_dump(exclude_unset=True, exclude_none=True)),
146        )
147        authors_repo.session.commit()
148        return Author.model_validate(obj)
149
150    # we override the authors_repo to use the version that joins the Books in
151    @get(
152        path="/authors/{author_id:uuid}",
153        dependencies={"authors_repo": Provide(provide_author_details_repo, sync_to_thread=False)},
154    )
155    def get_author(
156        self,
157        authors_repo: AuthorRepository,
158        author_id: UUID = Parameter(
159            title="Author ID",
160            description="The author to retrieve.",
161        ),
162    ) -> Author:
163        """Get an existing author."""
164        obj = authors_repo.get(author_id)
165        return Author.model_validate(obj)
166
167    @patch(
168        path="/authors/{author_id:uuid}",
169        dependencies={"authors_repo": Provide(provide_author_details_repo, sync_to_thread=False)},
170    )
171    def update_author(
172        self,
173        authors_repo: AuthorRepository,
174        data: AuthorUpdate,
175        author_id: UUID = Parameter(
176            title="Author ID",
177            description="The author to update.",
178        ),
179    ) -> Author:
180        """Update an author."""
181        raw_obj = data.model_dump(exclude_unset=True, exclude_none=True)
182        raw_obj.update({"id": author_id})
183        obj = authors_repo.update(AuthorModel(**raw_obj))
184        authors_repo.session.commit()
185        return Author.model_validate(obj)
186
187    @delete(path="/authors/{author_id:uuid}")
188    def delete_author(
189        self,
190        authors_repo: AuthorRepository,
191        author_id: UUID = Parameter(
192            title="Author ID",
193            description="The author to delete.",
194        ),
195    ) -> None:
196        """Delete a author from the system."""
197        _ = authors_repo.delete(author_id)
198        authors_repo.session.commit()
199
200
201sqlalchemy_config = SQLAlchemySyncConfig(
202    connection_string="sqlite:///test.sqlite", create_all=True
203)  # Create 'db_session' dependency.
204sqlalchemy_plugin = SQLAlchemyPlugin(config=sqlalchemy_config)
205
206
207app = Litestar(
208    route_handlers=[AuthorController],
209    plugins=[sqlalchemy_plugin],
210    dependencies={"limit_offset": Provide(provide_limit_offset_pagination)},
211)

The examples above enable a feature-complete CRUD service that includes pagination! In the next section, we’ll explore how to extend the built-in repository to add additional functionality to our application.

Full Code

Full Code (click to toggle)
app.py
  1from __future__ import annotations
  2
  3from datetime import date
  4from typing import TYPE_CHECKING
  5from uuid import UUID
  6
  7from pydantic import BaseModel as _BaseModel
  8from pydantic import TypeAdapter
  9from sqlalchemy import ForeignKey, select
 10from sqlalchemy.orm import Mapped, mapped_column, relationship, selectinload
 11
 12from litestar import Litestar, delete, get, patch, post
 13from litestar.controller import Controller
 14from litestar.di import Provide
 15from litestar.params import Parameter
 16from litestar.plugins.sqlalchemy import (
 17    AsyncSessionConfig,
 18    SQLAlchemyAsyncConfig,
 19    SQLAlchemyPlugin,
 20    base,
 21    filters,
 22    repository,
 23    service,
 24)
 25
 26if TYPE_CHECKING:
 27    from sqlalchemy.ext.asyncio import AsyncSession
 28
 29
 30class BaseModel(_BaseModel):
 31    """Extend Pydantic's BaseModel to enable ORM mode"""
 32
 33    model_config = {"from_attributes": True}
 34
 35
 36# The SQLAlchemy base includes a declarative model for you to use in your models.
 37# The `UUIDBase` class includes a `UUID` based primary key (`id`)
 38class AuthorModel(base.UUIDBase):
 39    # we can optionally provide the table name instead of auto-generating it
 40    __tablename__ = "author"
 41    name: Mapped[str]
 42    dob: Mapped[date | None]
 43    books: Mapped[list[BookModel]] = relationship(back_populates="author", lazy="noload")
 44
 45
 46# The `UUIDAuditBase` class includes the same UUID` based primary key (`id`) and 2
 47# additional columns: `created_at` and `updated_at`. `created_at` is a timestamp of when the
 48# record created, and `updated_at` is the last time the record was modified.
 49class BookModel(base.UUIDAuditBase):
 50    __tablename__ = "book"
 51    title: Mapped[str]
 52    author_id: Mapped[UUID] = mapped_column(ForeignKey("author.id"))
 53    author: Mapped[AuthorModel] = relationship(lazy="joined", innerjoin=True, viewonly=True)
 54
 55
 56# we will explicitly define the schema instead of using DTO objects for clarity.
 57
 58
 59class Author(BaseModel):
 60    id: UUID | None
 61    name: str
 62    dob: date | None = None
 63
 64
 65class AuthorCreate(BaseModel):
 66    name: str
 67    dob: date | None = None
 68
 69
 70class AuthorUpdate(BaseModel):
 71    name: str | None = None
 72    dob: date | None = None
 73
 74
 75class AuthorRepository(repository.SQLAlchemyAsyncRepository[AuthorModel]):
 76    """Author repository."""
 77
 78    model_type = AuthorModel
 79
 80
 81async def provide_authors_repo(db_session: AsyncSession) -> AuthorRepository:
 82    """This provides the default Authors repository."""
 83    return AuthorRepository(session=db_session)
 84
 85
 86# we can optionally override the default `select` used for the repository to pass in
 87# specific SQL options such as join details
 88async def provide_author_details_repo(db_session: AsyncSession) -> AuthorRepository:
 89    """This provides a simple example demonstrating how to override the join options for the repository."""
 90    return AuthorRepository(
 91        statement=select(AuthorModel).options(selectinload(AuthorModel.books)),
 92        session=db_session,
 93    )
 94
 95
 96def provide_limit_offset_pagination(
 97    current_page: int = Parameter(ge=1, query="currentPage", default=1, required=False),
 98    page_size: int = Parameter(
 99        query="pageSize",
100        ge=1,
101        default=10,
102        required=False,
103    ),
104) -> filters.LimitOffset:
105    """Add offset/limit pagination.
106
107    Return type consumed by `Repository.apply_limit_offset_pagination()`.
108
109    Parameters
110    ----------
111    current_page : int
112        LIMIT to apply to select.
113    page_size : int
114        OFFSET to apply to select.
115    """
116    return filters.LimitOffset(page_size, page_size * (current_page - 1))
117
118
119class AuthorController(Controller):
120    """Author CRUD"""
121
122    dependencies = {"authors_repo": Provide(provide_authors_repo)}
123
124    @get(path="/authors")
125    async def list_authors(
126        self,
127        authors_repo: AuthorRepository,
128        limit_offset: filters.LimitOffset,
129    ) -> service.OffsetPagination[Author]:
130        """List authors."""
131        results, total = await authors_repo.list_and_count(limit_offset)
132        type_adapter = TypeAdapter(list[Author])
133        return service.OffsetPagination[Author](
134            items=type_adapter.validate_python(results),
135            total=total,
136            limit=limit_offset.limit,
137            offset=limit_offset.offset,
138        )
139
140    @post(path="/authors")
141    async def create_author(
142        self,
143        authors_repo: AuthorRepository,
144        data: AuthorCreate,
145    ) -> Author:
146        """Create a new author."""
147        obj = await authors_repo.add(
148            AuthorModel(**data.model_dump(exclude_unset=True, exclude_none=True)),
149        )
150        await authors_repo.session.commit()
151        return Author.model_validate(obj)
152
153    # we override the authors_repo to use the version that joins the Books in
154    @get(path="/authors/{author_id:uuid}", dependencies={"authors_repo": Provide(provide_author_details_repo)})
155    async def get_author(
156        self,
157        authors_repo: AuthorRepository,
158        author_id: UUID = Parameter(
159            title="Author ID",
160            description="The author to retrieve.",
161        ),
162    ) -> Author:
163        """Get an existing author."""
164        obj = await authors_repo.get(author_id)
165        return Author.model_validate(obj)
166
167    @patch(
168        path="/authors/{author_id:uuid}",
169        dependencies={"authors_repo": Provide(provide_author_details_repo)},
170    )
171    async def update_author(
172        self,
173        authors_repo: AuthorRepository,
174        data: AuthorUpdate,
175        author_id: UUID = Parameter(
176            title="Author ID",
177            description="The author to update.",
178        ),
179    ) -> Author:
180        """Update an author."""
181        raw_obj = data.model_dump(exclude_unset=True, exclude_none=True)
182        raw_obj.update({"id": author_id})
183        obj = await authors_repo.update(AuthorModel(**raw_obj))
184        await authors_repo.session.commit()
185        return Author.model_validate(obj)
186
187    @delete(path="/authors/{author_id:uuid}")
188    async def delete_author(
189        self,
190        authors_repo: AuthorRepository,
191        author_id: UUID = Parameter(
192            title="Author ID",
193            description="The author to delete.",
194        ),
195    ) -> None:
196        """Delete a author from the system."""
197        _ = await authors_repo.delete(author_id)
198        await authors_repo.session.commit()
199
200
201session_config = AsyncSessionConfig(expire_on_commit=False)
202sqlalchemy_config = SQLAlchemyAsyncConfig(
203    connection_string="sqlite+aiosqlite:///test.sqlite", session_config=session_config, create_all=True
204)  # Create 'db_session' dependency.
205sqlalchemy_plugin = SQLAlchemyPlugin(config=sqlalchemy_config)
206
207
208app = Litestar(
209    route_handlers=[AuthorController],
210    plugins=[sqlalchemy_plugin],
211    dependencies={"limit_offset": Provide(provide_limit_offset_pagination)},
212)