Storage Configuration

Advanced configuration patterns for file storage backends.

Configuration Strategies

Environment-Based

Store configuration in environment variables:

import os
from advanced_alchemy.types.file_object import storages
from advanced_alchemy.types.file_object.backends.obstore import ObstoreBackend

def configure_storage_from_env():
    """Configure storage from environment variables."""
    storage_backend = os.environ.get("STORAGE_BACKEND", "local")

    if storage_backend == "s3":
        storages.register_backend(ObstoreBackend(
            key="default",
            fs=f"s3://{os.environ['S3_BUCKET']}/",
            aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID"),
            aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),
            aws_region=os.environ.get("AWS_REGION", "us-east-1"),
        ))
    elif storage_backend == "minio":
        # MinIO S3-compatible storage
        storages.register_backend(ObstoreBackend(
            key="default",
            fs=f"s3://{os.environ['MINIO_BUCKET']}/",
            aws_access_key_id=os.environ.get("MINIO_ACCESS_KEY"),
            aws_secret_access_key=os.environ.get("MINIO_SECRET_KEY"),
            aws_endpoint=os.environ.get("MINIO_ENDPOINT", "http://localhost:9000"),
            aws_region=os.environ.get("MINIO_REGION", "us-east-1"),
            aws_allow_http=True,  # MinIO local development
        ))
    elif storage_backend == "gcs":
        storages.register_backend(ObstoreBackend(
            key="default",
            fs=f"gs://{os.environ['GCS_BUCKET']}/",
            google_service_account=os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"),
        ))
    elif storage_backend == "azure":
        storages.register_backend(ObstoreBackend(
            key="default",
            fs=f"az://{os.environ['AZURE_CONTAINER']}/",
            azure_storage_connection_string=os.environ.get("AZURE_STORAGE_CONNECTION_STRING"),
        ))
    else:  # local
        from advanced_alchemy.types.file_object.backends.fsspec import FSSpecBackend
        storages.register_backend(FSSpecBackend(
            key="default",
            fs="file",
            prefix=os.environ.get("UPLOAD_DIR", "/var/app/uploads"),
        ))

Configuration File

Load configuration from YAML/TOML/JSON:

import toml
from advanced_alchemy.types.file_object.backends.obstore import ObstoreBackend

def configure_storage_from_file(config_path: str):
    """Configure storage from TOML file."""
    config = toml.load(config_path)

    for backend_config in config["storage"]["backends"]:
        if backend_config["type"] == "s3":
            storages.register_backend(ObstoreBackend(
                key=backend_config["key"],
                fs=backend_config["bucket"],
                aws_region=backend_config["region"],
            ))
# config.toml
[storage]
[[storage.backends]]
key = "documents"
type = "s3"
bucket = "s3://company-documents/"
region = "us-west-2"

[[storage.backends]]
key = "images"
type = "gcs"
bucket = "gs://company-images/"

Pydantic Settings

Use Pydantic for configuration validation:

from pydantic import Field
from pydantic_settings import BaseSettings

class StorageSettings(BaseSettings):
    """Storage configuration settings."""

    backend: str = Field(default="local")
    s3_bucket: str | None = Field(default=None)
    s3_region: str = Field(default="us-east-1")
    aws_access_key_id: str | None = Field(default=None)
    aws_secret_access_key: str | None = Field(default=None)
    upload_dir: str = Field(default="/var/app/uploads")

    class Config:
        env_prefix = "STORAGE_"

def configure_storage(settings: StorageSettings):
    """Configure storage from Pydantic settings."""
    if settings.backend == "s3":
        from advanced_alchemy.types.file_object.backends.obstore import ObstoreBackend
        storages.register_backend(ObstoreBackend(
            key="default",
            fs=f"s3://{settings.s3_bucket}/",
            aws_access_key_id=settings.aws_access_key_id,
            aws_secret_access_key=settings.aws_secret_access_key,
            aws_region=settings.s3_region,
        ))
    else:
        from advanced_alchemy.types.file_object.backends.fsspec import FSSpecBackend
        storages.register_backend(FSSpecBackend(
            key="default",
            fs="file",
            prefix=settings.upload_dir,
        ))

MinIO Configuration

MinIO is an S3-compatible object storage server for local development and production.

Local Development with Docker

Docker Compose configuration for MinIO:

# docker-compose.yml
services:
  object-storage:
    image: quay.io/minio/minio
    command: server /data --console-address ":9001"
    environment:
      MINIO_ROOT_USER: ${MINIO_ROOT_USER:-minioadmin}
      MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD:-minioadmin}
    volumes:
      - object-storage-data:/data
    ports:
      - 9000:9000  # API
      - 9001:9001  # Console UI
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
      interval: 30s
      timeout: 20s
      retries: 3

  object-storage-initializer:
    image: minio/mc:latest
    depends_on:
      object-storage:
        condition: service_healthy
    environment:
      MINIO_SERVER_URL: http://object-storage:9000
      MINIO_ACCESS_KEY: ${MINIO_ROOT_USER:-minioadmin}
      MINIO_SECRET_KEY: ${MINIO_ROOT_PASSWORD:-minioadmin}
    volumes:
      - ./minio-config.sh:/scripts/minio-config.sh
    entrypoint: /bin/sh
    command: /scripts/minio-config.sh

volumes:
  object-storage-data:

Bucket initialization script:

#!/bin/sh
# minio-config.sh
set -e

MINIO_ALIAS="object-storage"
BUCKET_PUBLIC="public"
BUCKET_PRIVATE="private"

# Configure MinIO client
mc alias set $MINIO_ALIAS $MINIO_SERVER_URL $MINIO_ACCESS_KEY $MINIO_SECRET_KEY

# Create public bucket
mc mb $MINIO_ALIAS/$BUCKET_PUBLIC || echo "Bucket $BUCKET_PUBLIC exists"
mc policy set download $MINIO_ALIAS/$BUCKET_PUBLIC

# Create private bucket
mc mb $MINIO_ALIAS/$BUCKET_PRIVATE || echo "Bucket $BUCKET_PRIVATE exists"
mc policy set none $MINIO_ALIAS/$BUCKET_PRIVATE

Application Configuration

Environment variables:

# .env
PUBLIC_STORAGE_URI=s3://public/
PUBLIC_STORAGE_OPTIONS={"aws_access_key_id":"minioadmin","aws_secret_access_key":"minioadmin","aws_endpoint":"http://localhost:9000","aws_region":"us-east-1","aws_allow_http":true}

PRIVATE_STORAGE_URI=s3://private/
PRIVATE_STORAGE_OPTIONS={"aws_access_key_id":"minioadmin","aws_secret_access_key":"minioadmin","aws_endpoint":"http://localhost:9000","aws_region":"us-east-1","aws_allow_http":true}

Settings dataclass:

from dataclasses import dataclass, field
from typing import Any

@dataclass
class StorageSettings:
    """Storage configuration settings."""

    PUBLIC_STORAGE_KEY: str = field(default="public")
    PUBLIC_STORAGE_URI: str = field(default="s3://public/")
    PUBLIC_STORAGE_OPTIONS: dict[str, Any] = field(default_factory=dict)

    PRIVATE_STORAGE_KEY: str = field(default="private")
    PRIVATE_STORAGE_URI: str = field(default="s3://private/")
    PRIVATE_STORAGE_OPTIONS: dict[str, Any] = field(default_factory=dict)

Backend registration:

from advanced_alchemy.types.file_object import storages
from advanced_alchemy.types.file_object.backends.obstore import ObstoreBackend

def configure_minio_storage(settings: StorageSettings):
    """Configure MinIO storage backends."""
    # Public storage
    storages.register_backend(ObstoreBackend(
        key=settings.PUBLIC_STORAGE_KEY,
        fs=settings.PUBLIC_STORAGE_URI,
        **settings.PUBLIC_STORAGE_OPTIONS
    ))

    # Private storage
    storages.register_backend(ObstoreBackend(
        key=settings.PRIVATE_STORAGE_KEY,
        fs=settings.PRIVATE_STORAGE_URI,
        **settings.PRIVATE_STORAGE_OPTIONS
    ))

Model usage:

from advanced_alchemy.base import UUIDAuditBase
from advanced_alchemy.types import FileObject, StoredObject
from sqlalchemy.orm import Mapped, mapped_column

class TeamFile(UUIDAuditBase):
    """Team file with private storage."""

    __tablename__ = "team_file"

    name: Mapped[str]
    file: Mapped[FileObject] = mapped_column(StoredObject(backend="private"))

    @property
    def url(self) -> str:
        """Generate signed URL for file access."""
        return await self.file.sign_async(expires_in=3600)

Production Configuration

For production MinIO deployments:

# Production settings
MINIO_ENDPOINT = "https://minio.example.com"
MINIO_ACCESS_KEY = "production-access-key"  # Use secrets manager
MINIO_SECRET_KEY = "production-secret-key"  # Use secrets manager

storages.register_backend(ObstoreBackend(
    key="default",
    fs="s3://production-bucket/",
    aws_access_key_id=MINIO_ACCESS_KEY,
    aws_secret_access_key=MINIO_SECRET_KEY,
    aws_endpoint=MINIO_ENDPOINT,
    aws_region="us-east-1",
    aws_allow_http=False,  # HTTPS in production
))

Framework Integration

Litestar Lifespan

from contextlib import asynccontextmanager
from litestar import Litestar

@asynccontextmanager
async def storage_lifespan(app: Litestar):
    """Configure storage on application startup."""
    configure_storage_from_env()
    yield
    # Cleanup if needed

app = Litestar(
    route_handlers=[...],
    lifespan=[storage_lifespan],
)

FastAPI Lifespan

from contextlib import asynccontextmanager
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Configure storage on application startup."""
    configure_storage_from_env()
    yield

app = FastAPI(lifespan=lifespan)

Flask Application Factory

from flask import Flask

def create_app(config_name: str = "development"):
    """Flask application factory."""
    app = Flask(__name__)

    # Configure storage
    with app.app_context():
        configure_storage_from_env()

    return app

Multiple Backend Strategies

Backend per Use Case

from advanced_alchemy.types.file_object.backends.obstore import ObstoreBackend
from advanced_alchemy.types.file_object.backends.fsspec import FSSpecBackend

def configure_multiple_backends():
    """Configure different backends for different use cases."""
    # User uploads on S3
    storages.register_backend(ObstoreBackend(
        key="user-uploads",
        fs="s3://user-uploads/",
        aws_region="us-west-2",
    ))

    # Product images on GCS
    storages.register_backend(ObstoreBackend(
        key="product-images",
        fs="gs://product-images/",
    ))

    # Documents on Azure
    storages.register_backend(ObstoreBackend(
        key="documents",
        fs="az://documents/",
        azure_storage_connection_string=os.environ["AZURE_STORAGE_CONNECTION_STRING"],
    ))

    # Temporary files locally
    storages.register_backend(FSSpecBackend(
        key="temp",
        fs="file",
        prefix="/tmp/uploads",
    ))

Backend per Environment

def configure_environment_backends(environment: str):
    """Configure backends based on environment."""
    if environment == "production":
        # Production: cloud storage
        storages.register_backend(ObstoreBackend(
            key="default",
            fs="s3://production-uploads/",
            aws_region="us-west-2",
        ))
    elif environment == "staging":
        # Staging: separate bucket
        storages.register_backend(ObstoreBackend(
            key="default",
            fs="s3://staging-uploads/",
            aws_region="us-west-2",
        ))
    else:
        # Development/testing: local storage
        from advanced_alchemy.types.file_object.backends.fsspec import FSSpecBackend
        storages.register_backend(FSSpecBackend(
            key="default",
            fs="file",
            prefix="/tmp/dev-uploads",
        ))

Security Configuration

Credential Management

# Environment variables (recommended)
AWS_ACCESS_KEY_ID=your-access-key
AWS_SECRET_ACCESS_KEY=your-secret-key

# AWS credentials file (~/.aws/credentials)
[default]
aws_access_key_id = your-access-key
aws_secret_access_key = your-secret-key

# IAM roles (EC2, ECS, Lambda)
# No credentials needed - automatically provided

Secrets Management

# AWS Secrets Manager
import boto3
import json

def get_storage_credentials():
    """Retrieve credentials from AWS Secrets Manager."""
    client = boto3.client("secretsmanager", region_name="us-west-2")
    response = client.get_secret_value(SecretId="storage-credentials")
    return json.loads(response["SecretString"])

credentials = get_storage_credentials()
storages.register_backend(ObstoreBackend(
    key="s3",
    fs="s3://my-bucket/",
    aws_access_key_id=credentials["access_key_id"],
    aws_secret_access_key=credentials["secret_access_key"],
    aws_region="us-west-2",
))

Encryption at Rest

# S3 server-side encryption
import fsspec

s3_fs = fsspec.filesystem(
    "s3",
    key="AWS_ACCESS_KEY_ID",
    secret="AWS_SECRET_ACCESS_KEY",
    s3_additional_kwargs={
        "ServerSideEncryption": "AES256",
    },
)

from advanced_alchemy.types.file_object.backends.fsspec import FSSpecBackend
storages.register_backend(FSSpecBackend(
    key="s3-encrypted",
    fs=s3_fs,
    prefix="my-bucket",
))

CORS Configuration

S3 CORS for Direct Upload

{
    "CORSRules": [
        {
            "AllowedOrigins": ["https://app.example.com"],
            "AllowedMethods": ["GET", "PUT", "POST"],
            "AllowedHeaders": ["*"],
            "ExposeHeaders": ["ETag"],
            "MaxAgeSeconds": 3000
        }
    ]
}

Signed URL Pattern

from litestar import post
from advanced_alchemy.types import FileObject

@post("/upload-url")
async def generate_upload_url(
    filename: str,
    content_type: str,
) -> "dict[str, str]":
    """Generate signed upload URL with CORS support."""
    file_obj = FileObject(
        backend="s3",
        filename=filename,
        content_type=content_type,
    )

    upload_url = await file_obj.sign_async(expires_in=300, for_upload=True)

    return {
        "upload_url": upload_url,
        "filename": filename,
        "content_type": content_type,
        "expires_in": 300,
    }

Performance Optimization

Connection Pooling

import fsspec

# fsspec: configure connection pool
s3_fs = fsspec.filesystem(
    "s3",
    key="AWS_ACCESS_KEY_ID",
    secret="AWS_SECRET_ACCESS_KEY",
    config_kwargs={
        "max_pool_connections": 50,
        "connect_timeout": 60,
        "read_timeout": 60,
    },
)

Caching

import fsspec

# Cache remote files locally
s3_fs = fsspec.filesystem(
    "s3",
    key="AWS_ACCESS_KEY_ID",
    secret="AWS_SECRET_ACCESS_KEY",
)

cached_fs = fsspec.filesystem(
    "filecache",
    target_protocol="s3",
    cache_storage="/tmp/fsspec_cache",
    fs=s3_fs,
)

from advanced_alchemy.types.file_object.backends.fsspec import FSSpecBackend
storages.register_backend(FSSpecBackend(
    key="s3-cached",
    fs=cached_fs,
    prefix="my-bucket",
))

Multipart Configuration

# obstore: configure multipart thresholds
from advanced_alchemy.types import FileObject

# Large files: increase chunk size
large_file = FileObject(
    backend="s3",
    filename="video.mp4",
    content=video_bytes,
)

await large_file.save_async(
    chunk_size=50 * 1024 * 1024,  # 50 MB chunks
    max_concurrency=20,
)

Monitoring and Logging

Storage Metrics

import logging
from advanced_alchemy.types import FileObject

logger = logging.getLogger(__name__)

async def upload_with_metrics(file_obj: FileObject):
    """Upload file with metrics logging."""
    start = time.time()

    try:
        await file_obj.save_async()
        duration = time.time() - start

        logger.info(
            "file uploaded",
            extra={
                "filename": file_obj.filename,
                "size": file_obj.size,
                "backend": file_obj.backend,
                "duration_ms": duration * 1000,
            }
        )
    except Exception as e:
        logger.error(
            "file upload failed",
            extra={
                "filename": file_obj.filename,
                "backend": file_obj.backend,
                "error": str(e),
            }
        )
        raise

Error Handling

from advanced_alchemy.types import FileObject

async def safe_upload(file_obj: FileObject, max_retries: int = 3):
    """Upload file with retry logic."""
    for attempt in range(max_retries):
        try:
            await file_obj.save_async()
            return
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            logger.warning(
                f"upload attempt {attempt + 1} failed, retrying",
                extra={"error": str(e)}
            )
            await asyncio.sleep(2 ** attempt)  # Exponential backoff

Testing Configuration

Test Fixtures

import pytest
from advanced_alchemy.types.file_object import storages
from advanced_alchemy.types.file_object.backends.fsspec import FSSpecBackend

@pytest.fixture(autouse=True)
def configure_test_storage():
    """Configure memory storage for all tests."""
    backend = FSSpecBackend(key="test", fs="memory")
    storages.register_backend(backend)
    yield
    storages._backends.clear()

Environment-Specific Tests

import pytest
import os

@pytest.fixture
def configure_storage_for_environment():
    """Configure storage based on test environment."""
    if os.environ.get("USE_REAL_S3") == "true":
        # Integration tests with real S3
        from advanced_alchemy.types.file_object.backends.obstore import ObstoreBackend
        storages.register_backend(ObstoreBackend(
            key="test",
            fs="s3://test-bucket/",
            aws_region="us-west-2",
        ))
    else:
        # Unit tests with memory storage
        from advanced_alchemy.types.file_object.backends.fsspec import FSSpecBackend
        storages.register_backend(FSSpecBackend(
            key="test",
            fs="memory"
        ))

    yield
    storages._backends.clear()

Migration Strategies

Gradual Migration

# Register both old and new backends
def configure_migration_backends():
    """Configure both local and S3 for gradual migration."""
    # Old backend (local)
    from advanced_alchemy.types.file_object.backends.fsspec import FSSpecBackend
    storages.register_backend(FSSpecBackend(
        key="local-legacy",
        fs="file",
        prefix="/var/app/uploads",
    ))

    # New backend (S3)
    from advanced_alchemy.types.file_object.backends.obstore import ObstoreBackend
    storages.register_backend(ObstoreBackend(
        key="s3-new",
        fs="s3://new-uploads/",
        aws_region="us-west-2",
    ))

# Use feature flag or gradual rollout
def get_storage_backend(user_id: UUID) -> str:
    """Determine storage backend for user."""
    if is_migrated_user(user_id):
        return "s3-new"
    return "local-legacy"

Data Migration Script

async def migrate_files_to_s3():
    """Migrate files from local to S3."""
    from sqlalchemy import select
    from advanced_alchemy.types import FileObject

    stmt = select(Document).where(
        Document.file.isnot(None)
    )
    result = await session.execute(stmt)
    documents = list(result.scalars())

    for doc in documents:
        if doc.file.backend == "local-legacy":
            # Get file content
            content = await doc.file.get_content_async()

            # Create new file on S3
            new_file = FileObject(
                backend="s3-new",
                filename=doc.file.filename,
                content_type=doc.file.content_type,
                metadata=doc.file.metadata,
                content=content,
            )

            # Save to S3
            await new_file.save_async()

            # Delete old file
            await doc.file.delete_async()

            # Update document
            doc.file = new_file
            await session.commit()

            logger.info(f"migrated file: {doc.file.filename}")

See Also