Interacting with repositories#
Now that we’ve covered the modelling basics, we are able to create our first repository class. The repository classes include all of the standard CRUD operations as well as a few advanced features such as pagination, filtering and bulk operations.
Tip
The full code for this tutorial can be found below in the Full Code section.
Before we jump in to the code, let’s take a look at the available functions available in the the synchronous and asynchronous repositories.
Available Functions in the Repositories
Function |
Category |
Description |
---|---|---|
|
Selecting Data |
Select a single record by primary key. Raises an exception when no record is found. |
|
Selecting Data |
Select a single record specified by the |
|
Selecting Data |
Select a single record specified by the |
|
Selecting Data |
Select a list of records specified by the |
|
Selecting Data |
Select a list of records specified by the |
|
Creating Data |
Select a single record specified by the the |
|
Creating Data |
Create a new record in the database. |
|
Creating Data |
Create one or more rows in the database. |
|
Updating Data |
Update an existing record in the database. |
|
Updating Data |
Update one or more rows in the database. |
|
Updating Data |
A single operation that updates or inserts a record based whether or not the primary key value on the model object is populated. |
|
Updating Data |
Updates or inserts multiple records based whether or not the primary key value on the model object is populated. |
|
Removing Data |
Remove a single record from the database. |
|
Removing Data |
Remove one or more records from the database. |
Note
All three of the bulk DML operations will leverage dialect-specific enhancements to be as efficient as possible. In addition to using efficient bulk inserts binds, the repository will optionally leverage the multi-row
RETURNING
support where possible. The repository will automatically detect this support from the SQLAlchemy driver, so no additional interaction is required to enable this.SQL engines generally have a limit to the number of elements that can be appended into an IN clause. The repository operations will automatically break lists that exceed this limit into multiple queries that are concatenated together before return. You do not need to account for this in your own code.
In the following examples, we’ll cover a few ways that you can use the repository within your applications.
Model Repository#
Here we import the
SQLAlchemyAsyncRepository
class and create an AuthorRepository
repository class. This is all that’s required
to include all of the integrated repository features.
app.py
# 1from rich import get_console
2from sqlalchemy.orm import Mapped
3
4from litestar.plugins.sqlalchemy import base, repository
5
6console = get_console()
7
8
9# the SQLAlchemy base includes a declarative model for you to use in your models.
10# The `Base` class includes a `UUID` based primary key (`id`)
11class Author(base.UUIDBase):
12 name: Mapped[str]
13 dob: Mapped[date]
14 dod: Mapped[date | None]
15
16
17class AuthorRepository(repository.SQLAlchemyAsyncRepository[Author]):
18 """Author repository."""
19
20 model_type = Author
Repository Context Manager#
Since we’ll be using the repository outside of a Litestar application in this script, we’ll make a simple context manager to automatically handle the creation (and cleanup) of our Author repository.
- The
repository_factory
method will do the following for us: Automatically create a new DB session from the SQLAlchemy configuration.
Rollback session when any exception occurs.
Automatically commit after function call completes.
app.py
#1# let's make a simple context manager as an example here.
2@asynccontextmanager
3async def repository_factory() -> AsyncIterator[AuthorRepository]:
4 async with session_factory() as db_session:
5 try:
6 yield AuthorRepository(session=db_session)
7 except Exception: # noqa: BLE001
8 await db_session.rollback()
9 else:
Creating, Updating and Removing Data#
To illustrate a few ways you can manipulate data in your database, we’ll go through the various CRUD operations:
Creating Data: Here’s a simple insert operation to populate our new Author table:
app.py
#1 async with repository_factory() as repo: 2 obj = await repo.add( 3 Author( 4 name="F. Scott Fitzgerald", 5 dob=datetime.strptime("1896-09-24", "%Y-%m-%d").date(), 6 ) 7 ) 8 console.print(f"Created Author record for {obj.name} with primary key {obj.id}.") 9 return obj
Updating Data: The update
method will ensure any updates made to the model object are executed on the database:
app.py
#1 async with repository_factory() as repo: 2 obj = await repo.update(obj) 3 console.print(f"Updated Author record for {obj.name} with primary key {obj.id}.") 4 return obj
Removing Data: The remove
method accepts the primary key of the row you want to delete:
app.py
#1 async with repository_factory() as repo: 2 obj = await repo.delete(id) 3 console.print(f"Deleted Author record for {obj.name} with primary key {obj.id}.") 4 return obj
Now that we’ve seen how to do single-row operations, let’s look at the bulk methods we can use.
Working with Bulk Data Operations#
In this section, we delve into the powerful capabilities of the repository classes for handling bulk data operations. Our example illustrates how we can efficiently manage data in our database. Specifically, we’ll use a JSON file containing information about US states and their abbreviations.
Here’s what we’re going to cover:
Fixture Data Loading#
We will introduce a method for loading fixture data. Fixture data is sample data that populates your database and helps test the behavior of your application under realistic conditions. This pattern can be extended and adjusted to meet your needs.
app.py
# 1import json
2from pathlib import Path
3from typing import Any
4
5
6def open_fixture(fixtures_path: Path, fixture_name: str) -> Any:
7 """Loads JSON file with the specified fixture name
8
9 Args:
10 fixtures_path (Path): The path to look for fixtures
11 fixture_name (str): The fixture name to load.
12
13 Raises:
14 FileNotFoundError: Fixtures not found.
15
16 Returns:
17 Any: The parsed JSON data
18 """
19 fixture = Path(fixtures_path / f"{fixture_name}.json")
20 if fixture.exists():
21 with fixture.open(mode="r", encoding="utf-8") as f:
22 f_data = f.read()
23 return json.loads(f_data)
24 raise FileNotFoundError(f"Could not find the {fixture_name} fixture")
You can review the JSON source file here:
US State Lookup JSON
You can download it: /examples/contrib/sqlalchemy/us_state_lookup.json
or view below:
us_state_lookup.json
#[
{
"name": "Alabama",
"abbreviation": "AL"
},
{
"name": "Alaska",
"abbreviation": "AK"
},
{
"name": "Arizona",
"abbreviation": "AZ"
},
{
"name": "Arkansas",
"abbreviation": "AR"
},
{
"name": "California",
"abbreviation": "CA"
},
{
"name": "Colorado",
"abbreviation": "CO"
},
{
"name": "Connecticut",
"abbreviation": "CT"
},
{
"name": "Delaware",
"abbreviation": "DE"
},
{
"name": "District Of Columbia",
"abbreviation": "DC"
},
{
"name": "Florida",
"abbreviation": "FL"
},
{
"name": "Georgia",
"abbreviation": "GA"
},
{
"name": "Guam",
"abbreviation": "GU"
},
{
"name": "Hawaii",
"abbreviation": "HI"
},
{
"name": "Idaho",
"abbreviation": "ID"
},
{
"name": "Illinois",
"abbreviation": "IL"
},
{
"name": "Indiana",
"abbreviation": "IN"
},
{
"name": "Iowa",
"abbreviation": "IA"
},
{
"name": "Kansas",
"abbreviation": "KS"
},
{
"name": "Kentucky",
"abbreviation": "KY"
},
{
"name": "Louisiana",
"abbreviation": "LA"
},
{
"name": "Maine",
"abbreviation": "ME"
},
{
"name": "Maryland",
"abbreviation": "MD"
},
{
"name": "Massachusetts",
"abbreviation": "MA"
},
{
"name": "Michigan",
"abbreviation": "MI"
},
{
"name": "Minnesota",
"abbreviation": "MN"
},
{
"name": "Mississippi",
"abbreviation": "MS"
},
{
"name": "Missouri",
"abbreviation": "MO"
},
{
"name": "Montana",
"abbreviation": "MT"
},
{
"name": "Nebraska",
"abbreviation": "NE"
},
{
"name": "Nevada",
"abbreviation": "NV"
},
{
"name": "New Hampshire",
"abbreviation": "NH"
},
{
"name": "New Jersey",
"abbreviation": "NJ"
},
{
"name": "New Mexico",
"abbreviation": "NM"
},
{
"name": "New York",
"abbreviation": "NY"
},
{
"name": "North Carolina",
"abbreviation": "NC"
},
{
"name": "North Dakota",
"abbreviation": "ND"
},
{
"name": "Ohio",
"abbreviation": "OH"
},
{
"name": "Oklahoma",
"abbreviation": "OK"
},
{
"name": "Oregon",
"abbreviation": "OR"
},
{
"name": "Palau",
"abbreviation": "PW"
},
{
"name": "Pennsylvania",
"abbreviation": "PA"
},
{
"name": "Puerto Rico",
"abbreviation": "PR"
},
{
"name": "Rhode Island",
"abbreviation": "RI"
},
{
"name": "South Carolina",
"abbreviation": "SC"
},
{
"name": "South Dakota",
"abbreviation": "SD"
},
{
"name": "Tennessee",
"abbreviation": "TN"
},
{
"name": "Texas",
"abbreviation": "TX"
},
{
"name": "Utah",
"abbreviation": "UT"
},
{
"name": "Vermont",
"abbreviation": "VT"
},
{
"name": "Virginia",
"abbreviation": "VA"
},
{
"name": "Washington",
"abbreviation": "WA"
},
{
"name": "West Virginia",
"abbreviation": "WV"
},
{
"name": "Wisconsin",
"abbreviation": "WI"
},
{
"name": "Wyoming",
"abbreviation": "WY"
}
]
Bulk Insert#
We’ll use our fixture data to demonstrate a bulk insert operation. This operation allows you to add multiple records to your database in a single transaction, improving performance when working with larger data sets.
app.py
# 1from rich import get_console
2from sqlalchemy import create_engine
3from sqlalchemy.orm import Mapped, Session, sessionmaker
4
5from litestar.plugins.sqlalchemy import base, repository
6
7console = get_console()
8
9
10class USState(base.UUIDBase):
11 __tablename__ = "us_state_lookup" # type: ignore[assignment]
12 abbreviation: Mapped[str]
13 name: Mapped[str]
14
15
16class USStateRepository(repository.SQLAlchemySyncRepository[USState]):
17 """US State repository."""
18
19 model_type = USState
20
21
22engine = create_engine(
23 "duckdb:///:memory:",
24 future=True,
25)
26session_factory: sessionmaker[Session] = sessionmaker(engine, expire_on_commit=False)
27
28
29def run_script() -> None:
30 """Load data from a fixture."""
31
32 # Initializes the database.
33 with engine.begin() as conn:
34 USState.metadata.create_all(conn)
35
36 with session_factory() as db_session:
37 # 1) Load the JSON data into the US States table.
38 repo = USStateRepository(session=db_session)
39 fixture = open_fixture(here, USStateRepository.model_type.__tablename__) # type: ignore
40 objs = repo.add_many([USStateRepository.model_type(**raw_obj) for raw_obj in fixture])
41 db_session.commit()
42 console.print(f"Created {len(objs)} new objects.")
Paginated Data Selection#
Next, let’s explore how to select multiple records with pagination. This functionality
is useful for handling large amounts of data by breaking the data into manageable
‘pages’ or subsets. LimitOffset
is one of several filter types you can use with the
repository.
app.py
#1from litestar.repository.filters import LimitOffset
2
3
4 # 2) Select paginated data and total row count.
5 created_objs, total_objs = repo.list_and_count(LimitOffset(limit=10, offset=0))
6 console.print(f"Selected {len(created_objs)} records out of a total of {total_objs}.")
Bulk Delete#
Here we demonstrate how to perform a bulk delete operation. Just as with the bulk insert, deleting multiple records with the batch record methods is more efficient than executing row-by-row.
app.py
#1 # 3) Let's remove the batch of records selected.
2 deleted_objs = repo.delete_many([new_obj.id for new_obj in created_objs])
3 console.print(f"Removed {len(deleted_objs)} records out of a total of {total_objs}.")
Counts#
Finally, we’ll demonstrate how to count the number of records remaining in the database.
app.py
#1 # 4) Let's count the remaining rows
2 remaining_count = repo.count()
3 console.print(f"Found {remaining_count} remaining records after delete.")
Now that we have demonstrated how to interact with the repository objects outside of a
Litestar application, our next example will use dependency injection to add this
functionality to a Controller
!
Full Code#
Full Code (click to toggle)
app.py
# 1from __future__ import annotations
2
3from contextlib import asynccontextmanager
4from datetime import date, datetime
5from typing import AsyncIterator
6from uuid import UUID
7
8import anyio
9from rich import get_console
10from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
11from sqlalchemy.orm import Mapped
12
13from litestar.plugins.sqlalchemy import base, repository
14
15console = get_console()
16
17
18# the SQLAlchemy base includes a declarative model for you to use in your models.
19# The `Base` class includes a `UUID` based primary key (`id`)
20class Author(base.UUIDBase):
21 name: Mapped[str]
22 dob: Mapped[date]
23 dod: Mapped[date | None]
24
25
26class AuthorRepository(repository.SQLAlchemyAsyncRepository[Author]):
27 """Author repository."""
28
29 model_type = Author
30
31
32engine = create_async_engine(
33 "sqlite+aiosqlite:///test.sqlite",
34 future=True,
35)
36session_factory = async_sessionmaker(engine, expire_on_commit=False)
37
38
39# let's make a simple context manager as an example here.
40@asynccontextmanager
41async def repository_factory() -> AsyncIterator[AuthorRepository]:
42 async with session_factory() as db_session:
43 try:
44 yield AuthorRepository(session=db_session)
45 except Exception: # noqa: BLE001
46 await db_session.rollback()
47 else:
48 await db_session.commit()
49
50
51async def create_author() -> Author:
52 async with repository_factory() as repo:
53 obj = await repo.add(
54 Author(
55 name="F. Scott Fitzgerald",
56 dob=datetime.strptime("1896-09-24", "%Y-%m-%d").date(),
57 )
58 )
59 console.print(f"Created Author record for {obj.name} with primary key {obj.id}.")
60 return obj
61
62
63async def update_author(obj: Author) -> Author:
64 async with repository_factory() as repo:
65 obj = await repo.update(obj)
66 console.print(f"Updated Author record for {obj.name} with primary key {obj.id}.")
67 return obj
68
69
70async def remove_author(id: UUID) -> Author:
71 async with repository_factory() as repo:
72 obj = await repo.delete(id)
73 console.print(f"Deleted Author record for {obj.name} with primary key {obj.id}.")
74 return obj
75
76
77async def get_author_if_exists(id: UUID) -> Author | None:
78 async with repository_factory() as repo:
79 obj = await repo.get_one_or_none(id=id)
80 if obj is not None:
81 console.print(f"Found Author record for {obj.name} with primary key {obj.id}.")
82 else:
83 console.print(f"Could not find Author with primary key {id}.")
84 return obj
85
86
87async def run_script() -> None:
88 """Load data from a fixture."""
89 async with engine.begin() as conn:
90 await conn.run_sync(base.UUIDBase.metadata.create_all)
91
92 # 1) create a new Author record.
93 console.print("1) Adding a new record")
94 author = await create_author()
95 author_id = author.id
96
97 # 2) Let's update the Author record.
98 console.print("2) Updating a record.")
99 author.dod = datetime.strptime("1940-12-21", "%Y-%m-%d").date()
100 await update_author(author)
101
102 # 3) Let's delete the record we just created.
103 console.print("3) Removing a record.")
104 await remove_author(author_id)
105
106 # 4) Let's verify the record no longer exists.
107 console.print("4) Select one or none.")
108 _should_be_none = await get_author_if_exists(author_id)
109
110
111if __name__ == "__main__":
112 anyio.run(run_script)
app.py
# 1from __future__ import annotations
2
3from contextlib import asynccontextmanager
4from datetime import date, datetime
5from collections.abc import AsyncIterator
6from uuid import UUID
7
8import anyio
9from rich import get_console
10from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
11from sqlalchemy.orm import Mapped
12
13from litestar.plugins.sqlalchemy import base, repository
14
15console = get_console()
16
17
18# the SQLAlchemy base includes a declarative model for you to use in your models.
19# The `Base` class includes a `UUID` based primary key (`id`)
20class Author(base.UUIDBase):
21 name: Mapped[str]
22 dob: Mapped[date]
23 dod: Mapped[date | None]
24
25
26class AuthorRepository(repository.SQLAlchemyAsyncRepository[Author]):
27 """Author repository."""
28
29 model_type = Author
30
31
32engine = create_async_engine(
33 "sqlite+aiosqlite:///test.sqlite",
34 future=True,
35)
36session_factory = async_sessionmaker(engine, expire_on_commit=False)
37
38
39# let's make a simple context manager as an example here.
40@asynccontextmanager
41async def repository_factory() -> AsyncIterator[AuthorRepository]:
42 async with session_factory() as db_session:
43 try:
44 yield AuthorRepository(session=db_session)
45 except Exception: # noqa: BLE001
46 await db_session.rollback()
47 else:
48 await db_session.commit()
49
50
51async def create_author() -> Author:
52 async with repository_factory() as repo:
53 obj = await repo.add(
54 Author(
55 name="F. Scott Fitzgerald",
56 dob=datetime.strptime("1896-09-24", "%Y-%m-%d").date(),
57 )
58 )
59 console.print(f"Created Author record for {obj.name} with primary key {obj.id}.")
60 return obj
61
62
63async def update_author(obj: Author) -> Author:
64 async with repository_factory() as repo:
65 obj = await repo.update(obj)
66 console.print(f"Updated Author record for {obj.name} with primary key {obj.id}.")
67 return obj
68
69
70async def remove_author(id: UUID) -> Author:
71 async with repository_factory() as repo:
72 obj = await repo.delete(id)
73 console.print(f"Deleted Author record for {obj.name} with primary key {obj.id}.")
74 return obj
75
76
77async def get_author_if_exists(id: UUID) -> Author | None:
78 async with repository_factory() as repo:
79 obj = await repo.get_one_or_none(id=id)
80 if obj is not None:
81 console.print(f"Found Author record for {obj.name} with primary key {obj.id}.")
82 else:
83 console.print(f"Could not find Author with primary key {id}.")
84 return obj
85
86
87async def run_script() -> None:
88 """Load data from a fixture."""
89 async with engine.begin() as conn:
90 await conn.run_sync(base.UUIDBase.metadata.create_all)
91
92 # 1) create a new Author record.
93 console.print("1) Adding a new record")
94 author = await create_author()
95 author_id = author.id
96
97 # 2) Let's update the Author record.
98 console.print("2) Updating a record.")
99 author.dod = datetime.strptime("1940-12-21", "%Y-%m-%d").date()
100 await update_author(author)
101
102 # 3) Let's delete the record we just created.
103 console.print("3) Removing a record.")
104 await remove_author(author_id)
105
106 # 4) Let's verify the record no longer exists.
107 console.print("4) Select one or none.")
108 _should_be_none = await get_author_if_exists(author_id)
109
110
111if __name__ == "__main__":
112 anyio.run(run_script)