Working with Controllers and Repositories¶
We’ve been working our way up the stack, starting with the database models, and now we are ready to use the repository in an actual route. Let’s see how we can use this in a controller.
Tip
The full code for this tutorial can be found below in the Full Code section.
First, we create a simple function that returns an instance of AuthorRepository
.
This function will be used to inject a repository instance into our controller routes.
Note that we are only passing in the database session in this example with no other
parameters.
app.py
¶1 model_type = AuthorModel
Because we’ll be using the SQLAlchemy plugin in Litestar, the session is automatically configured as a dependency.
By default, the repository doesn’t add any additional query options to your base statement, but provides the flexibility to override it, allowing you to pass your own statement:
app.py
¶1 return AuthorRepository(session=db_session)
2
3
4# we can optionally override the default `select` used for the repository to pass in
5# specific SQL options such as join details
6async def provide_author_details_repo(db_session: AsyncSession) -> AuthorRepository:
7 """This provides a simple example demonstrating how to override the join options for the repository."""
8 return AuthorRepository(
In this instance, we enhance the repository function by adding a selectinload
option. This option configures the specified relationship to load via
SELECT .. IN … loading pattern, optimizing the query execution.
Next, we define the AuthorController
. This controller exposes five routes for
interacting with the Author
model:
AuthorController
(click to toggle)
app.py
¶ 1 return filters.LimitOffset(page_size, page_size * (current_page - 1))
2
3
4class AuthorController(Controller):
5 """Author CRUD"""
6
7 dependencies = {"authors_repo": Provide(provide_authors_repo)}
8
9 @get(path="/authors")
10 async def list_authors(
11 self,
12 authors_repo: AuthorRepository,
13 limit_offset: filters.LimitOffset,
14 ) -> service.OffsetPagination[Author]:
15 """List authors."""
16 results, total = await authors_repo.list_and_count(limit_offset)
17 type_adapter = TypeAdapter(list[Author])
18 return service.OffsetPagination[Author](
19 items=type_adapter.validate_python(results),
20 total=total,
21 limit=limit_offset.limit,
22 offset=limit_offset.offset,
23 )
24
25 @post(path="/authors")
26 async def create_author(
27 self,
28 authors_repo: AuthorRepository,
29 data: AuthorCreate,
30 ) -> Author:
31 """Create a new author."""
32 obj = await authors_repo.add(
33 AuthorModel(**data.model_dump(exclude_unset=True, exclude_none=True)),
34 )
35 await authors_repo.session.commit()
36 return Author.model_validate(obj)
37
38 # we override the authors_repo to use the version that joins the Books in
39 @get(path="/authors/{author_id:uuid}", dependencies={"authors_repo": Provide(provide_author_details_repo)})
40 async def get_author(
41 self,
42 authors_repo: AuthorRepository,
43 author_id: UUID = Parameter(
44 title="Author ID",
45 description="The author to retrieve.",
46 ),
47 ) -> Author:
48 """Get an existing author."""
49 obj = await authors_repo.get(author_id)
50 return Author.model_validate(obj)
51
52 @patch(
53 path="/authors/{author_id:uuid}",
54 dependencies={"authors_repo": Provide(provide_author_details_repo)},
55 )
56 async def update_author(
57 self,
58 authors_repo: AuthorRepository,
59 data: AuthorUpdate,
60 author_id: UUID = Parameter(
61 title="Author ID",
62 description="The author to update.",
63 ),
64 ) -> Author:
65 """Update an author."""
66 raw_obj = data.model_dump(exclude_unset=True, exclude_none=True)
67 raw_obj.update({"id": author_id})
68 obj = await authors_repo.update(AuthorModel(**raw_obj))
69 await authors_repo.session.commit()
70 return Author.model_validate(obj)
71
72 @delete(path="/authors/{author_id:uuid}")
73 async def delete_author(
74 self,
75 authors_repo: AuthorRepository,
76 author_id: UUID = Parameter(
77 title="Author ID",
78 description="The author to delete.",
79 ),
In our list detail endpoint, we use the pagination filter for limiting the amount of data returned, allowing us to retrieve large datasets in smaller, more manageable chunks.
In the above examples, we’ve used the asynchronous repository implementation. However, Litestar also supports synchronous database drivers with an identical implementation. Here’s a corresponding synchronous version of the previous example:
Synchronous Repository (click to toggle)
app.py
¶ 1from __future__ import annotations
2
3from datetime import date
4from typing import TYPE_CHECKING
5from uuid import UUID
6
7from pydantic import BaseModel as _BaseModel
8from pydantic import TypeAdapter
9from sqlalchemy import ForeignKey
10from sqlalchemy.orm import Mapped, mapped_column, relationship
11
12from litestar import Litestar, delete, get, patch, post
13from litestar.controller import Controller
14from litestar.di import Provide
15from litestar.params import Parameter
16from litestar.plugins.sqlalchemy import SQLAlchemyPlugin, SQLAlchemySyncConfig
17from litestar.plugins.sqlalchemy.base import UUIDAuditBase, UUIDBase
18from litestar.plugins.sqlalchemy.filters import LimitOffset
19from litestar.plugins.sqlalchemy.repository import SQLAlchemySyncRepository
20from litestar.plugins.sqlalchemy.service import OffsetPagination
21
22if TYPE_CHECKING:
23 from sqlalchemy.orm import Session
24
25
26class BaseModel(_BaseModel):
27 """Extend Pydantic's BaseModel to enable ORM mode"""
28
29 model_config = {"from_attributes": True}
30
31
32# The SQLAlchemy base includes a declarative model for you to use in your models.
33# The `UUIDBase` class includes a `UUID` based primary key (`id`)
34class AuthorModel(UUIDBase):
35 # we can optionally provide the table name instead of auto-generating it
36 __tablename__ = "author"
37 name: Mapped[str]
38 dob: Mapped[date | None]
39 books: Mapped[list[BookModel]] = relationship(back_populates="author", lazy="noload")
40
41
42# The `UUIDAuditBase` class includes the same UUID` based primary key (`id`) and 2
43# additional columns: `created_at` and `updated_at`. `created_at` is a timestamp of when the
44# record created, and `updated_at` is the last time the record was modified.
45class BookModel(UUIDAuditBase):
46 __tablename__ = "book"
47 title: Mapped[str]
48 author_id: Mapped[UUID] = mapped_column(ForeignKey("author.id"))
49 author: Mapped[AuthorModel] = relationship(lazy="joined", innerjoin=True, viewonly=True)
50
51
52# we will explicitly define the schema instead of using DTO objects for clarity.
53
54
55class Author(BaseModel):
56 id: UUID | None
57 name: str
58 dob: date | None = None
59
60
61class AuthorCreate(BaseModel):
62 name: str
63 dob: date | None = None
64
65
66class AuthorUpdate(BaseModel):
67 name: str | None = None
68 dob: date | None = None
69
70
71class AuthorRepository(SQLAlchemySyncRepository[AuthorModel]):
72 """Author repository."""
73
74 model_type = AuthorModel
75
76
77async def provide_authors_repo(db_session: Session) -> AuthorRepository:
78 """This provides the default Authors repository."""
79 return AuthorRepository(session=db_session)
80
81
82# we can optionally override the default `select` used for the repository to pass in
83# specific SQL options such as join details
84async def provide_author_details_repo(db_session: Session) -> AuthorRepository:
85 """This provides a simple example demonstrating how to override the join options
86 for the repository."""
87 return AuthorRepository(
88 session=db_session,
89 load=[AuthorModel.books],
90 )
91
92
93def provide_limit_offset_pagination(
94 current_page: int = Parameter(ge=1, query="currentPage", default=1, required=False),
95 page_size: int = Parameter(
96 query="pageSize",
97 ge=1,
98 default=10,
99 required=False,
100 ),
101) -> LimitOffset:
102 """Add offset/limit pagination.
103
104 Return type consumed by `Repository.apply_limit_offset_pagination()`.
105
106 Parameters
107 ----------
108 current_page : int
109 LIMIT to apply to select.
110 page_size : int
111 OFFSET to apply to select.
112 """
113 return LimitOffset(page_size, page_size * (current_page - 1))
114
115
116class AuthorController(Controller):
117 """Author CRUD"""
118
119 dependencies = {"authors_repo": Provide(provide_authors_repo, sync_to_thread=False)}
120
121 @get(path="/authors")
122 def list_authors(
123 self,
124 authors_repo: AuthorRepository,
125 limit_offset: LimitOffset,
126 ) -> OffsetPagination[Author]:
127 """List authors."""
128 results, total = authors_repo.list_and_count(limit_offset)
129 type_adapter = TypeAdapter(list[Author])
130 return OffsetPagination[Author](
131 items=type_adapter.validate_python(results),
132 total=total,
133 limit=limit_offset.limit,
134 offset=limit_offset.offset,
135 )
136
137 @post(path="/authors")
138 def create_author(
139 self,
140 authors_repo: AuthorRepository,
141 data: AuthorCreate,
142 ) -> Author:
143 """Create a new author."""
144 obj = authors_repo.add(
145 AuthorModel(**data.model_dump(exclude_unset=True, exclude_none=True)),
146 )
147 authors_repo.session.commit()
148 return Author.model_validate(obj)
149
150 # we override the authors_repo to use the version that joins the Books in
151 @get(
152 path="/authors/{author_id:uuid}",
153 dependencies={"authors_repo": Provide(provide_author_details_repo, sync_to_thread=False)},
154 )
155 def get_author(
156 self,
157 authors_repo: AuthorRepository,
158 author_id: UUID = Parameter(
159 title="Author ID",
160 description="The author to retrieve.",
161 ),
162 ) -> Author:
163 """Get an existing author."""
164 obj = authors_repo.get(author_id)
165 return Author.model_validate(obj)
166
167 @patch(
168 path="/authors/{author_id:uuid}",
169 dependencies={"authors_repo": Provide(provide_author_details_repo, sync_to_thread=False)},
170 )
171 def update_author(
172 self,
173 authors_repo: AuthorRepository,
174 data: AuthorUpdate,
175 author_id: UUID = Parameter(
176 title="Author ID",
177 description="The author to update.",
178 ),
179 ) -> Author:
180 """Update an author."""
181 raw_obj = data.model_dump(exclude_unset=True, exclude_none=True)
182 raw_obj.update({"id": author_id})
183 obj = authors_repo.update(AuthorModel(**raw_obj))
184 authors_repo.session.commit()
185 return Author.model_validate(obj)
186
187 @delete(path="/authors/{author_id:uuid}")
188 def delete_author(
189 self,
190 authors_repo: AuthorRepository,
191 author_id: UUID = Parameter(
192 title="Author ID",
193 description="The author to delete.",
194 ),
195 ) -> None:
196 """Delete a author from the system."""
197 _ = authors_repo.delete(author_id)
198 authors_repo.session.commit()
199
200
201sqlalchemy_config = SQLAlchemySyncConfig(
202 connection_string="sqlite:///test.sqlite", create_all=True
203) # Create 'db_session' dependency.
204sqlalchemy_plugin = SQLAlchemyPlugin(config=sqlalchemy_config)
205
206
207app = Litestar(
208 route_handlers=[AuthorController],
209 plugins=[sqlalchemy_plugin],
210 dependencies={"limit_offset": Provide(provide_limit_offset_pagination)},
211)
The examples above enable a feature-complete CRUD service that includes pagination! In the next section, we’ll explore how to extend the built-in repository to add additional functionality to our application.
Full Code¶
Full Code (click to toggle)
app.py
¶ 1from __future__ import annotations
2
3from datetime import date
4from typing import TYPE_CHECKING
5from uuid import UUID
6
7from pydantic import BaseModel as _BaseModel
8from pydantic import TypeAdapter
9from sqlalchemy import ForeignKey, select
10from sqlalchemy.orm import Mapped, mapped_column, relationship, selectinload
11
12from litestar import Litestar, delete, get, patch, post
13from litestar.controller import Controller
14from litestar.di import Provide
15from litestar.params import Parameter
16from litestar.plugins.sqlalchemy import (
17 AsyncSessionConfig,
18 SQLAlchemyAsyncConfig,
19 SQLAlchemyPlugin,
20 base,
21 filters,
22 repository,
23 service,
24)
25
26if TYPE_CHECKING:
27 from sqlalchemy.ext.asyncio import AsyncSession
28
29
30class BaseModel(_BaseModel):
31 """Extend Pydantic's BaseModel to enable ORM mode"""
32
33 model_config = {"from_attributes": True}
34
35
36# The SQLAlchemy base includes a declarative model for you to use in your models.
37# The `UUIDBase` class includes a `UUID` based primary key (`id`)
38class AuthorModel(base.UUIDBase):
39 # we can optionally provide the table name instead of auto-generating it
40 __tablename__ = "author"
41 name: Mapped[str]
42 dob: Mapped[date | None]
43 books: Mapped[list[BookModel]] = relationship(back_populates="author", lazy="noload")
44
45
46# The `UUIDAuditBase` class includes the same UUID` based primary key (`id`) and 2
47# additional columns: `created_at` and `updated_at`. `created_at` is a timestamp of when the
48# record created, and `updated_at` is the last time the record was modified.
49class BookModel(base.UUIDAuditBase):
50 __tablename__ = "book"
51 title: Mapped[str]
52 author_id: Mapped[UUID] = mapped_column(ForeignKey("author.id"))
53 author: Mapped[AuthorModel] = relationship(lazy="joined", innerjoin=True, viewonly=True)
54
55
56# we will explicitly define the schema instead of using DTO objects for clarity.
57
58
59class Author(BaseModel):
60 id: UUID | None
61 name: str
62 dob: date | None = None
63
64
65class AuthorCreate(BaseModel):
66 name: str
67 dob: date | None = None
68
69
70class AuthorUpdate(BaseModel):
71 name: str | None = None
72 dob: date | None = None
73
74
75class AuthorRepository(repository.SQLAlchemyAsyncRepository[AuthorModel]):
76 """Author repository."""
77
78 model_type = AuthorModel
79
80
81async def provide_authors_repo(db_session: AsyncSession) -> AuthorRepository:
82 """This provides the default Authors repository."""
83 return AuthorRepository(session=db_session)
84
85
86# we can optionally override the default `select` used for the repository to pass in
87# specific SQL options such as join details
88async def provide_author_details_repo(db_session: AsyncSession) -> AuthorRepository:
89 """This provides a simple example demonstrating how to override the join options for the repository."""
90 return AuthorRepository(
91 statement=select(AuthorModel).options(selectinload(AuthorModel.books)),
92 session=db_session,
93 )
94
95
96def provide_limit_offset_pagination(
97 current_page: int = Parameter(ge=1, query="currentPage", default=1, required=False),
98 page_size: int = Parameter(
99 query="pageSize",
100 ge=1,
101 default=10,
102 required=False,
103 ),
104) -> filters.LimitOffset:
105 """Add offset/limit pagination.
106
107 Return type consumed by `Repository.apply_limit_offset_pagination()`.
108
109 Parameters
110 ----------
111 current_page : int
112 LIMIT to apply to select.
113 page_size : int
114 OFFSET to apply to select.
115 """
116 return filters.LimitOffset(page_size, page_size * (current_page - 1))
117
118
119class AuthorController(Controller):
120 """Author CRUD"""
121
122 dependencies = {"authors_repo": Provide(provide_authors_repo)}
123
124 @get(path="/authors")
125 async def list_authors(
126 self,
127 authors_repo: AuthorRepository,
128 limit_offset: filters.LimitOffset,
129 ) -> service.OffsetPagination[Author]:
130 """List authors."""
131 results, total = await authors_repo.list_and_count(limit_offset)
132 type_adapter = TypeAdapter(list[Author])
133 return service.OffsetPagination[Author](
134 items=type_adapter.validate_python(results),
135 total=total,
136 limit=limit_offset.limit,
137 offset=limit_offset.offset,
138 )
139
140 @post(path="/authors")
141 async def create_author(
142 self,
143 authors_repo: AuthorRepository,
144 data: AuthorCreate,
145 ) -> Author:
146 """Create a new author."""
147 obj = await authors_repo.add(
148 AuthorModel(**data.model_dump(exclude_unset=True, exclude_none=True)),
149 )
150 await authors_repo.session.commit()
151 return Author.model_validate(obj)
152
153 # we override the authors_repo to use the version that joins the Books in
154 @get(path="/authors/{author_id:uuid}", dependencies={"authors_repo": Provide(provide_author_details_repo)})
155 async def get_author(
156 self,
157 authors_repo: AuthorRepository,
158 author_id: UUID = Parameter(
159 title="Author ID",
160 description="The author to retrieve.",
161 ),
162 ) -> Author:
163 """Get an existing author."""
164 obj = await authors_repo.get(author_id)
165 return Author.model_validate(obj)
166
167 @patch(
168 path="/authors/{author_id:uuid}",
169 dependencies={"authors_repo": Provide(provide_author_details_repo)},
170 )
171 async def update_author(
172 self,
173 authors_repo: AuthorRepository,
174 data: AuthorUpdate,
175 author_id: UUID = Parameter(
176 title="Author ID",
177 description="The author to update.",
178 ),
179 ) -> Author:
180 """Update an author."""
181 raw_obj = data.model_dump(exclude_unset=True, exclude_none=True)
182 raw_obj.update({"id": author_id})
183 obj = await authors_repo.update(AuthorModel(**raw_obj))
184 await authors_repo.session.commit()
185 return Author.model_validate(obj)
186
187 @delete(path="/authors/{author_id:uuid}")
188 async def delete_author(
189 self,
190 authors_repo: AuthorRepository,
191 author_id: UUID = Parameter(
192 title="Author ID",
193 description="The author to delete.",
194 ),
195 ) -> None:
196 """Delete a author from the system."""
197 _ = await authors_repo.delete(author_id)
198 await authors_repo.session.commit()
199
200
201session_config = AsyncSessionConfig(expire_on_commit=False)
202sqlalchemy_config = SQLAlchemyAsyncConfig(
203 connection_string="sqlite+aiosqlite:///test.sqlite", session_config=session_config, create_all=True
204) # Create 'db_session' dependency.
205sqlalchemy_plugin = SQLAlchemyPlugin(config=sqlalchemy_config)
206
207
208app = Litestar(
209 route_handlers=[AuthorController],
210 plugins=[sqlalchemy_plugin],
211 dependencies={"limit_offset": Provide(provide_limit_offset_pagination)},
212)