Obstore Backend

Rust-based storage backend implementation with native async support.

Characteristics

  • Implementation: Rust via PyO3 bindings

  • Async Support: Native async/await

  • Supported Backends: S3, GCS, Azure, local filesystem, memory

  • Installation: pip install "advanced-alchemy[obstore]"

Supported Backends

obstore provides native implementations for:

  • Amazon S3: AWS S3, MinIO, DigitalOcean Spaces, Cloudflare R2

  • Google Cloud Storage: GCS with service account or default credentials

  • Azure Blob Storage: Azure with connection string or account key

  • Local Filesystem: Local file storage

  • Memory: In-memory storage for testing

Installation

pip install "advanced-alchemy[obstore]"

Basic Usage

Backend Registration

from advanced_alchemy.types.file_object import storages
from advanced_alchemy.types.file_object.backends.obstore import ObstoreBackend

# Register backend
storages.register_backend(ObstoreBackend(
    key="local",
    fs="file:///var/app/uploads/",
))

Using in Models

from sqlalchemy.orm import Mapped, mapped_column
from advanced_alchemy.base import UUIDAuditBase
from advanced_alchemy.types import FileObject, StoredObject

class Document(UUIDAuditBase):
    __tablename__ = "documents"

    title: "Mapped[str]"
    file: "Mapped[Optional[FileObject]]" = mapped_column(
        StoredObject(backend="local")
    )

Local Filesystem

Basic Setup

from advanced_alchemy.types.file_object.backends.obstore import ObstoreBackend

storages.register_backend(ObstoreBackend(
    key="local",
    fs="file:///var/app/uploads/",
))

Relative Paths

import os

# Absolute path
upload_dir = os.path.abspath("/var/app/uploads")

storages.register_backend(ObstoreBackend(
    key="local",
    fs=f"file://{upload_dir}/",
))

Amazon S3

Access Key Authentication

from advanced_alchemy.types.file_object.backends.obstore import ObstoreBackend

storages.register_backend(ObstoreBackend(
    key="s3",
    fs="s3://my-bucket/",
    aws_access_key_id="AKIAIOSFODNN7EXAMPLE",
    aws_secret_access_key="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
    aws_region="us-west-2",
))

IAM Role Authentication

# Use IAM role (EC2, ECS, Lambda)
storages.register_backend(ObstoreBackend(
    key="s3",
    fs="s3://my-bucket/",
    aws_region="us-west-2",
    # No credentials - uses IAM role
))

S3-Compatible Services

MinIO

MinIO provides S3-compatible object storage for local development and production:

# Local development
storages.register_backend(ObstoreBackend(
    key="minio",
    fs="s3://my-bucket/",
    aws_endpoint="http://localhost:9000",
    aws_access_key_id="minioadmin",
    aws_secret_access_key="minioadmin",
    aws_region="us-east-1",
    aws_allow_http=True,  # HTTP for local development
))

# Production deployment
storages.register_backend(ObstoreBackend(
    key="minio-prod",
    fs="s3://production-bucket/",
    aws_endpoint="https://minio.example.com",
    aws_access_key_id="production-key",
    aws_secret_access_key="production-secret",
    aws_region="us-east-1",
    aws_allow_http=False,  # HTTPS in production
))

For Docker Compose setup and multi-bucket configuration, see MinIO Configuration.

Cloudflare R2

storages.register_backend(ObstoreBackend(
    key="r2",
    fs="s3://my-bucket/",
    aws_endpoint="https://account-id.r2.cloudflarestorage.com",
    aws_access_key_id="R2_ACCESS_KEY_ID",
    aws_secret_access_key="R2_SECRET_ACCESS_KEY",
))

DigitalOcean Spaces

storages.register_backend(ObstoreBackend(
    key="spaces",
    fs="s3://my-space/",
    aws_endpoint="https://nyc3.digitaloceanspaces.com",
    aws_access_key_id="SPACES_ACCESS_KEY",
    aws_secret_access_key="SPACES_SECRET_KEY",
    aws_region="us-east-1",
))

Google Cloud Storage

Service Account

from advanced_alchemy.types.file_object.backends.obstore import ObstoreBackend

storages.register_backend(ObstoreBackend(
    key="gcs",
    fs="gs://my-bucket/",
    google_service_account="/path/to/service-account.json",
))

Default Credentials

# Use application default credentials
storages.register_backend(ObstoreBackend(
    key="gcs",
    fs="gs://my-bucket/",
    # No credentials - uses default credentials
))

Azure Blob Storage

Connection String

from advanced_alchemy.types.file_object.backends.obstore import ObstoreBackend

storages.register_backend(ObstoreBackend(
    key="azure",
    fs="az://my-container/",
    azure_storage_connection_string="DefaultEndpointsProtocol=https;AccountName=...;AccountKey=...;EndpointSuffix=core.windows.net",
))

Account Key

storages.register_backend(ObstoreBackend(
    key="azure",
    fs="az://my-container/",
    azure_storage_account_name="mystorageaccount",
    azure_storage_account_key="account-key-here",
))

SAS Token

storages.register_backend(ObstoreBackend(
    key="azure",
    fs="az://my-container/",
    azure_storage_account_name="mystorageaccount",
    azure_storage_sas_token="sas-token-here",
))

Memory (Testing)

In-Memory Storage

from advanced_alchemy.types.file_object.backends.obstore import ObstoreBackend

storages.register_backend(ObstoreBackend(
    key="memory",
    fs="memory://",
))

File Operations

Upload with Metadata

from advanced_alchemy.types import FileObject

file_obj = FileObject(
    backend="s3",
    filename="invoice.pdf",
    content_type="application/pdf",
    metadata={
        "invoice_number": "INV-2025-001",
        "customer_id": "12345",
        "amount": "1500.00",
    },
    content=pdf_bytes,
)

await file_obj.save_async()

Signed URLs

# Download URL (expires in 1 hour)
download_url = await file_obj.sign_async(expires_in=3600)

# Upload URL (expires in 5 minutes)
upload_url = await file_obj.sign_async(expires_in=300, for_upload=True)

Multipart Upload

obstore automatically uses multipart for large files:

# Default settings (automatic)
await large_file.save_async()

# Custom chunk size and concurrency
await large_file.save_async(
    use_multipart=True,
    chunk_size=50 * 1024 * 1024,  # 50 MB chunks
    max_concurrency=20,
)

# Disable multipart for small files
await small_file.save_async(use_multipart=False)

Advanced Configuration

Environment Variables

import os
from advanced_alchemy.types.file_object.backends.obstore import ObstoreBackend

storages.register_backend(ObstoreBackend(
    key="s3",
    fs="s3://my-bucket/",
    aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
    aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
    aws_region=os.environ.get("AWS_REGION", "us-east-1"),
))

Startup Configuration

from contextlib import asynccontextmanager
from litestar import Litestar

@asynccontextmanager
async def configure_storage(app: Litestar):
    """Configure obstore on startup."""
    from advanced_alchemy.types.file_object.backends.obstore import ObstoreBackend

    storages.register_backend(ObstoreBackend(
        key="documents",
        fs=f"s3://{os.environ['S3_BUCKET']}/",
        aws_region=os.environ["AWS_REGION"],
    ))

    yield

app = Litestar(route_handlers=[...], lifespan=[configure_storage])

Multiple Backends

# Documents on S3
storages.register_backend(ObstoreBackend(
    key="documents",
    fs="s3://company-documents/",
    aws_region="us-west-2",
))

# Images on GCS
storages.register_backend(ObstoreBackend(
    key="images",
    fs="gs://company-images/",
    google_service_account="/path/to/sa.json",
))

# Temporary files locally
storages.register_backend(ObstoreBackend(
    key="temp",
    fs="file:///tmp/uploads/",
))

Framework Integration

File Upload Handling

from litestar import post
from litestar.datastructures import UploadFile
from litestar.enums import RequestEncodingType
from litestar.params import Body
from advanced_alchemy.types import FileObject
from typing import Annotated

@post("/upload", signature_namespace={"DocumentService": DocumentService})
async def upload_file(
    data: Annotated[UploadFile, Body(media_type=RequestEncodingType.MULTI_PART)],
    service: DocumentService,
) -> Document:
    """Upload file to storage backend."""
    # Create document with file
    doc_data = {
        "title": data.filename or "untitled",
        "file": FileObject(
            backend="s3",
            filename=data.filename or "file",
            content_type=data.content_type or "application/octet-stream",
            content=await data.read(),
        ),
    }
    return await service.create(doc_data)
from fastapi import APIRouter, UploadFile, Depends
from advanced_alchemy.types import FileObject

router = APIRouter()

@router.post("/upload")
async def upload_file(
    file: UploadFile,
    service: DocumentService = Depends(get_document_service),
) -> Document:
    """Upload file to storage backend."""
    # Create document with file
    doc_data = {
        "title": file.filename or "untitled",
        "file": FileObject(
            backend="s3",
            filename=file.filename or "file",
            content_type=file.content_type or "application/octet-stream",
            content=await file.read(),
        ),
    }
    return await service.create(doc_data)

Signed URL Generation

from litestar import post
from advanced_alchemy.types import FileObject

@post("/upload-url")
async def generate_upload_url(filename: str, content_type: str) -> dict[str, str]:
    """Generate signed upload URL for client-side upload."""
    file_obj = FileObject(
        backend="s3",
        filename=filename,
        content_type=content_type,
    )

    upload_url = await file_obj.sign_async(expires_in=3600, for_upload=True)

    return {
        "upload_url": upload_url,
        "filename": filename,
        "expires_in": 3600,
    }
from fastapi import APIRouter
from advanced_alchemy.types import FileObject

router = APIRouter()

@router.post("/upload-url")
async def generate_upload_url(filename: str, content_type: str) -> dict[str, str]:
    """Generate signed upload URL for client-side upload."""
    file_obj = FileObject(
        backend="s3",
        filename=filename,
        content_type=content_type,
    )

    upload_url = await file_obj.sign_async(expires_in=3600, for_upload=True)

    return {
        "upload_url": upload_url,
        "filename": filename,
        "expires_in": 3600,
    }

Testing

Memory Backend

import pytest
from advanced_alchemy.types.file_object import storages
from advanced_alchemy.types.file_object.backends.obstore import ObstoreBackend

@pytest.fixture
def memory_storage():
    """Configure in-memory obstore storage."""
    backend = ObstoreBackend(key="test", fs="memory://")
    storages.register_backend(backend)
    yield backend
    storages._backends.pop("test", None)

async def test_file_upload(memory_storage):
    """Test file upload with obstore memory backend."""
    from advanced_alchemy.types import FileObject

    file_obj = FileObject(
        backend="test",
        filename="test.txt",
        content=b"Test content",
    )

    await file_obj.save_async()
    content = await file_obj.get_content_async()
    assert content == b"Test content"

Test Fixtures

@pytest.fixture
async def sample_file(memory_storage):
    """Create sample file for testing."""
    from advanced_alchemy.types import FileObject

    file_obj = FileObject(
        backend="test",
        filename="sample.txt",
        content_type="text/plain",
        content=b"Sample content",
    )
    await file_obj.save_async()
    return file_obj

async def test_file_operations(sample_file):
    """Test file operations."""
    content = await sample_file.get_content_async()
    assert content == b"Sample content"

    await sample_file.delete_async()

Performance Optimization

Chunk Size Configuration

# Small files (<10MB): disable multipart
await small_file.save_async(use_multipart=False)

# Medium files (10MB-1GB): use defaults
await medium_file.save_async()

# Large files (>1GB): increase chunk size
await large_file.save_async(
    chunk_size=50 * 1024 * 1024,  # 50MB
    max_concurrency=20,
)

Concurrent Operations

import asyncio

# Upload multiple files concurrently
files = [
    FileObject(backend="s3", filename=f"file{i}.txt", content=b"data")
    for i in range(100)
]

await asyncio.gather(*[f.save_async() for f in files])

Connection Reuse

obstore reuses connections automatically:

# Same backend = reused connections
backend = ObstoreBackend(key="s3", fs="s3://my-bucket/")
storages.register_backend(backend)

# All operations reuse the same connection pool
await file1.save_async()
await file2.save_async()
await file3.save_async()

Common Issues

LocalStore Metadata Support

LocalStore doesn’t support content-type or metadata:

# Metadata ignored on local filesystem
file_obj = FileObject(
    backend="local",
    filename="test.txt",
    content_type="text/plain",  # Ignored
    metadata={"key": "value"},  # Ignored
    content=b"data",
)

Use S3/GCS/Azure for metadata support.

Signed URL Support

Not all backends support signed URLs:

# S3/GCS/Azure: supported
url = await file_obj.sign_async(expires_in=3600)

# LocalStore: NotImplementedError
# Use cloud storage for signed URLs

Authentication Errors

Verify credentials and permissions:

# AWS credentials
export AWS_ACCESS_KEY_ID=your-access-key
export AWS_SECRET_ACCESS_KEY=your-secret-key
export AWS_REGION=us-west-2

# GCP credentials
export GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json

Endpoint Format

Use correct URL format:

# Correct
aws_endpoint="http://localhost:9000"      # With protocol
fs="s3://bucket/"                         # With trailing slash

# Incorrect
aws_endpoint="localhost:9000"             # Missing protocol
fs="s3://bucket"                          # Missing trailing slash

Migration from FSSpec

Configuration Changes

# Before (fsspec)
import fsspec
from advanced_alchemy.types.file_object.backends.fsspec import FSSpecBackend

s3_fs = fsspec.filesystem(
    "s3",
    key="AWS_ACCESS_KEY",
    secret="AWS_SECRET_KEY",
)
storages.register_backend(FSSpecBackend(
    key="s3",
    fs=s3_fs,
    prefix="my-bucket",
))

# After (obstore)
from advanced_alchemy.types.file_object.backends.obstore import ObstoreBackend

storages.register_backend(ObstoreBackend(
    key="s3",
    fs="s3://my-bucket/",
    aws_access_key_id="AWS_ACCESS_KEY",
    aws_secret_access_key="AWS_SECRET_KEY",
))

Model Code Unchanged

# Models don't change
class Document(UUIDAuditBase):
    file: "Mapped[Optional[FileObject]]" = mapped_column(
        StoredObject(backend="s3")
    )

# FileObject API identical
await file_obj.save_async()
content = await file_obj.get_content_async()
await file_obj.delete_async()

See Also