Advanced¶
Advanced Alchemy supports bulk operations, specialized repositories, and custom query patterns for complex requirements.
Prerequisites¶
Bulk Operations¶
Repositories provide efficient bulk operation methods for working with multiple records.
Add Many¶
Create multiple records in a single operation:
from collections.abc import Sequence
async def create_posts(db_session: AsyncSession, data: list[tuple[str, str, UUID]]) -> Sequence[Post]:
repository = PostRepository(session=db_session)
# Create posts
return await repository.add_many(
[Post(title=title, content=content, author_id=author_id) for title, content, author_id in data],
auto_commit=True
)
Characteristics:
Single database round-trip
Batch insert operation
Returns sequence of created records
More efficient than multiple
add()calls
Update Many¶
Update multiple records:
async def publish_posts(db_session: AsyncSession, post_ids: list[int]) -> list[Post]:
repository = PostRepository(session=db_session)
# Fetch posts to update
posts = await repository.list(Post.id.in_(post_ids), Post.published == False)
# Update all posts
for post in posts:
post.published = True
return await repository.update_many(posts, auto_commit=True)
Characteristics:
Updates multiple records in single transaction
Maintains object state
Returns updated records
Delete Many¶
Delete multiple records by IDs:
async def delete_posts(db_session: AsyncSession, post_ids: list[int]) -> list[Post]:
repository = PostRepository(session=db_session)
return await repository.delete_many(post_ids, auto_commit=True)
Characteristics:
Deletes by primary key list
Single database operation
Returns deleted records (before deletion)
Delete Where¶
Delete records matching filter criteria:
async def delete_unpublished_posts(db_session: AsyncSession) -> list[Post]:
repository = PostRepository(session=db_session)
return await repository.delete_where(Post.published == False, auto_commit=True)
Characteristics:
Deletes all records matching filter
More efficient than fetch-then-delete
Returns deleted records (before deletion)
Upsert Operations¶
Upsert (insert or update) handles conflicts automatically:
async def upsert_post(
db_session: AsyncSession,
post_id: int,
title: str,
content: str
) -> Post:
repository = PostRepository(session=db_session)
return await repository.upsert(
{"id": post_id, "title": title, "content": content},
match_fields=["id"],
auto_commit=True
)
Parameters:
match_fields: Columns to match for existing recordsIf match found: updates existing record
If no match: inserts new record
Characteristics:
Atomic operation
Database-specific implementation (
ON CONFLICT,MERGE, etc.)Handles race conditions
Bulk Upsert¶
Upsert multiple records:
async def upsert_posts(
db_session: AsyncSession,
posts_data: list[dict]
) -> list[Post]:
repository = PostRepository(session=db_session)
return await repository.upsert_many(
posts_data,
match_fields=["id"],
auto_commit=True
)
Characteristics:
Efficient batch upsert
Single database round-trip
Maintains consistency
Transaction Management¶
Complex Multi-Repository Transactions¶
Coordinate multiple repositories in a single transaction:
from advanced_alchemy.utils.text import slugify
async def create_post_with_tags(
db_session: AsyncSession,
title: str,
content: str,
tag_names: list[str]
) -> Post:
# Both repositories share the same transaction
post_repo = PostRepository(session=db_session)
tag_repo = TagRepository(session=db_session)
async with db_session.begin():
# Create or get existing tags
tags = []
for name in tag_names:
tag = await tag_repo.get_one_or_none(Tag.name == name)
if not tag:
tag = await tag_repo.add(Tag(name=name, slug=slugify(name)))
tags.append(tag)
# Create post with tags
post = await post_repo.add(
Post(title=title, content=content, tags=tags),
auto_commit=True
)
return post
See also
This is just to illustrate the concept. In practice, UniqueMixin
should be used to handle this lookup more easily. See Using UniqueMixin.
Characteristics:
Multiple repositories share session
Single transaction boundary
Automatic rollback on exception
Maintains ACID properties
Specialized Repositories¶
Slug Repository¶
For models using the SlugKey mixin, use SQLAlchemyAsyncSlugRepository:
from advanced_alchemy.repository import SQLAlchemyAsyncSlugRepository
class ArticleRepository(SQLAlchemyAsyncSlugRepository[Article]):
"""Repository for articles with slug-based lookups."""
model_type = Article
async def get_article_by_slug(db_session: AsyncSession, slug: str) -> Article:
repository = ArticleRepository(session=db_session)
return await repository.get_by_slug(slug)
Additional methods:
get_by_slug(slug: str)- Retrieve record by slugAll standard repository methods
Characteristics:
Slug-based lookups
URL-friendly operations
Optimized for slug queries
Query Repository¶
For complex custom queries:
from advanced_alchemy.repository import SQLAlchemyAsyncQueryRepository
from sqlalchemy import select, func
async def get_posts_per_author(db_session: AsyncSession) -> list[tuple[UUID, int]]:
repository = SQLAlchemyAsyncQueryRepository(session=db_session)
stmt = select(Post.author_id, func.count(Post.id)).group_by(Post.author_id)
return await repository.list(stmt)
Characteristics:
Executes raw SELECT statements
Supports aggregations, joins, subqueries
Returns query results (not model instances for aggregations)
Useful for reporting and analytics
Implementation Patterns¶
Performance Characteristics¶
Different patterns have distinct performance profiles:
Single Insert Pattern
# Multiple individual inserts
for user_data in users:
await repository.add(User(**user_data), auto_commit=True)
# Characteristics: N commits, slower with many records, simple code
Bulk Insert Pattern
# Bulk insert
await repository.add_many(
[User(**data) for data in users],
auto_commit=True
)
# Characteristics: 1 commit, faster with many records, efficient
Choose bulk operations for multiple records.
Upsert vs Select-Then-Update¶
Two patterns for conditional updates:
Select-Then-Update Pattern
# Fetch record
post = await repository.get_one_or_none(Post.id == post_id)
if post:
# Update existing
post.title = new_title
await repository.update(post, auto_commit=True)
else:
# Create new
await repository.add(Post(id=post_id, title=new_title), auto_commit=True)
Characteristics:
Two database round-trips
Race condition possible between select and insert
Clear logic flow
Upsert Pattern
# Upsert
await repository.upsert(
{"id": post_id, "title": new_title},
match_fields=["id"],
auto_commit=True
)
Characteristics:
Single database round-trip
Atomic operation, no race conditions
Database-specific implementation
Upsert is more efficient and safer for concurrent access.
Custom Repository Methods¶
Extend repositories with custom methods:
from advanced_alchemy.repository import SQLAlchemyAsyncRepository
from sqlalchemy import select, func
class PostRepository(SQLAlchemyAsyncRepository[Post]):
"""Extended repository with custom methods."""
model_type = Post
async def get_published_count(self) -> int:
"""Get count of published posts."""
stmt = select(func.count(Post.id)).where(Post.published == True)
result = await self.session.execute(stmt)
return result.scalar_one()
async def get_recent_published(self, limit: int = 10) -> list[Post]:
"""Get recent published posts."""
return await self.list(
Post.published == True,
Post.published_at.isnot(None),
load=[selectinload(Post.tags)],
order_by=[Post.published_at.desc()],
limit=limit
)
Custom methods encapsulate domain-specific queries.
Technical Constraints¶
Bulk Operation Atomicity¶
Bulk operations are atomic within transactions:
# ✅ Correct - bulk operation is atomic
async with db_session.begin():
posts = await repository.add_many(post_instances)
# All inserts succeed or all fail
# ⚠️ Note - individual auto_commit operations are separate transactions
posts = await repository.add_many(post_instances, auto_commit=True)
# Each insert is separate transaction (database-dependent behavior)
Use manual transactions for guaranteed atomicity across bulk operations.
Upsert Match Field Requirements¶
match_fields must correspond to unique constraints:
# ✅ Correct - match_fields on unique columns
await repository.upsert(
{"email": "user@example.com", "name": "Alice"},
match_fields=["email"], # email has unique constraint
auto_commit=True
)
# ❌ Incorrect - match_fields without unique constraint
await repository.upsert(
{"name": "Alice", "age": 30},
match_fields=["name"], # name is not unique
auto_commit=True
)
# May update wrong record or fail
Ensure match_fields have unique constraints or primary key.
Database-Specific Upsert Behavior¶
Upsert implementation varies by database:
PostgreSQL: Uses
ON CONFLICT DO UPDATEMySQL: Uses
ON DUPLICATE KEY UPDATESQLite: Uses
ON CONFLICT DO UPDATE(requires SQLite 3.24.0+)Oracle: Uses
MERGEstatementSQL Server: Uses
MERGEstatement
Test upsert behavior for your target database backend.
Next Steps¶
This covers core repository functionality. Next, explore services for business logic.