Storage Configuration¶
Advanced configuration patterns for file storage backends.
Configuration Strategies¶
Environment-Based¶
Store configuration in environment variables:
import os
from advanced_alchemy.types.file_object import storages
from advanced_alchemy.types.file_object.backends.obstore import ObstoreBackend
def configure_storage_from_env():
"""Configure storage from environment variables."""
storage_backend = os.environ.get("STORAGE_BACKEND", "local")
if storage_backend == "s3":
storages.register_backend(ObstoreBackend(
key="default",
fs=f"s3://{os.environ['S3_BUCKET']}/",
aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),
aws_region=os.environ.get("AWS_REGION", "us-east-1"),
))
elif storage_backend == "minio":
# MinIO S3-compatible storage
storages.register_backend(ObstoreBackend(
key="default",
fs=f"s3://{os.environ['MINIO_BUCKET']}/",
aws_access_key_id=os.environ.get("MINIO_ACCESS_KEY"),
aws_secret_access_key=os.environ.get("MINIO_SECRET_KEY"),
aws_endpoint=os.environ.get("MINIO_ENDPOINT", "http://localhost:9000"),
aws_region=os.environ.get("MINIO_REGION", "us-east-1"),
aws_allow_http=True, # MinIO local development
))
elif storage_backend == "gcs":
storages.register_backend(ObstoreBackend(
key="default",
fs=f"gs://{os.environ['GCS_BUCKET']}/",
google_service_account=os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"),
))
elif storage_backend == "azure":
storages.register_backend(ObstoreBackend(
key="default",
fs=f"az://{os.environ['AZURE_CONTAINER']}/",
azure_storage_connection_string=os.environ.get("AZURE_STORAGE_CONNECTION_STRING"),
))
else: # local
from advanced_alchemy.types.file_object.backends.fsspec import FSSpecBackend
storages.register_backend(FSSpecBackend(
key="default",
fs="file",
prefix=os.environ.get("UPLOAD_DIR", "/var/app/uploads"),
))
Configuration File¶
Load configuration from YAML/TOML/JSON:
import toml
from advanced_alchemy.types.file_object.backends.obstore import ObstoreBackend
def configure_storage_from_file(config_path: str):
"""Configure storage from TOML file."""
config = toml.load(config_path)
for backend_config in config["storage"]["backends"]:
if backend_config["type"] == "s3":
storages.register_backend(ObstoreBackend(
key=backend_config["key"],
fs=backend_config["bucket"],
aws_region=backend_config["region"],
))
# config.toml
[storage]
[[storage.backends]]
key = "documents"
type = "s3"
bucket = "s3://company-documents/"
region = "us-west-2"
[[storage.backends]]
key = "images"
type = "gcs"
bucket = "gs://company-images/"
Pydantic Settings¶
Use Pydantic for configuration validation:
from pydantic import Field
from pydantic_settings import BaseSettings
class StorageSettings(BaseSettings):
"""Storage configuration settings."""
backend: str = Field(default="local")
s3_bucket: str | None = Field(default=None)
s3_region: str = Field(default="us-east-1")
aws_access_key_id: str | None = Field(default=None)
aws_secret_access_key: str | None = Field(default=None)
upload_dir: str = Field(default="/var/app/uploads")
class Config:
env_prefix = "STORAGE_"
def configure_storage(settings: StorageSettings):
"""Configure storage from Pydantic settings."""
if settings.backend == "s3":
from advanced_alchemy.types.file_object.backends.obstore import ObstoreBackend
storages.register_backend(ObstoreBackend(
key="default",
fs=f"s3://{settings.s3_bucket}/",
aws_access_key_id=settings.aws_access_key_id,
aws_secret_access_key=settings.aws_secret_access_key,
aws_region=settings.s3_region,
))
else:
from advanced_alchemy.types.file_object.backends.fsspec import FSSpecBackend
storages.register_backend(FSSpecBackend(
key="default",
fs="file",
prefix=settings.upload_dir,
))
MinIO Configuration¶
MinIO is an S3-compatible object storage server for local development and production.
Local Development with Docker¶
Docker Compose configuration for MinIO:
# docker-compose.yml
services:
object-storage:
image: quay.io/minio/minio
command: server /data --console-address ":9001"
environment:
MINIO_ROOT_USER: ${MINIO_ROOT_USER:-minioadmin}
MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD:-minioadmin}
volumes:
- object-storage-data:/data
ports:
- 9000:9000 # API
- 9001:9001 # Console UI
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
object-storage-initializer:
image: minio/mc:latest
depends_on:
object-storage:
condition: service_healthy
environment:
MINIO_SERVER_URL: http://object-storage:9000
MINIO_ACCESS_KEY: ${MINIO_ROOT_USER:-minioadmin}
MINIO_SECRET_KEY: ${MINIO_ROOT_PASSWORD:-minioadmin}
volumes:
- ./minio-config.sh:/scripts/minio-config.sh
entrypoint: /bin/sh
command: /scripts/minio-config.sh
volumes:
object-storage-data:
Bucket initialization script:
#!/bin/sh
# minio-config.sh
set -e
MINIO_ALIAS="object-storage"
BUCKET_PUBLIC="public"
BUCKET_PRIVATE="private"
# Configure MinIO client
mc alias set $MINIO_ALIAS $MINIO_SERVER_URL $MINIO_ACCESS_KEY $MINIO_SECRET_KEY
# Create public bucket
mc mb $MINIO_ALIAS/$BUCKET_PUBLIC || echo "Bucket $BUCKET_PUBLIC exists"
mc policy set download $MINIO_ALIAS/$BUCKET_PUBLIC
# Create private bucket
mc mb $MINIO_ALIAS/$BUCKET_PRIVATE || echo "Bucket $BUCKET_PRIVATE exists"
mc policy set none $MINIO_ALIAS/$BUCKET_PRIVATE
Application Configuration¶
Environment variables:
# .env
PUBLIC_STORAGE_URI=s3://public/
PUBLIC_STORAGE_OPTIONS={"aws_access_key_id":"minioadmin","aws_secret_access_key":"minioadmin","aws_endpoint":"http://localhost:9000","aws_region":"us-east-1","aws_allow_http":true}
PRIVATE_STORAGE_URI=s3://private/
PRIVATE_STORAGE_OPTIONS={"aws_access_key_id":"minioadmin","aws_secret_access_key":"minioadmin","aws_endpoint":"http://localhost:9000","aws_region":"us-east-1","aws_allow_http":true}
Settings dataclass:
from dataclasses import dataclass, field
from typing import Any
@dataclass
class StorageSettings:
"""Storage configuration settings."""
PUBLIC_STORAGE_KEY: str = field(default="public")
PUBLIC_STORAGE_URI: str = field(default="s3://public/")
PUBLIC_STORAGE_OPTIONS: dict[str, Any] = field(default_factory=dict)
PRIVATE_STORAGE_KEY: str = field(default="private")
PRIVATE_STORAGE_URI: str = field(default="s3://private/")
PRIVATE_STORAGE_OPTIONS: dict[str, Any] = field(default_factory=dict)
Backend registration:
from advanced_alchemy.types.file_object import storages
from advanced_alchemy.types.file_object.backends.obstore import ObstoreBackend
def configure_minio_storage(settings: StorageSettings):
"""Configure MinIO storage backends."""
# Public storage
storages.register_backend(ObstoreBackend(
key=settings.PUBLIC_STORAGE_KEY,
fs=settings.PUBLIC_STORAGE_URI,
**settings.PUBLIC_STORAGE_OPTIONS
))
# Private storage
storages.register_backend(ObstoreBackend(
key=settings.PRIVATE_STORAGE_KEY,
fs=settings.PRIVATE_STORAGE_URI,
**settings.PRIVATE_STORAGE_OPTIONS
))
Model usage:
from advanced_alchemy.base import UUIDAuditBase
from advanced_alchemy.types import FileObject, StoredObject
from sqlalchemy.orm import Mapped, mapped_column
class TeamFile(UUIDAuditBase):
"""Team file with private storage."""
__tablename__ = "team_file"
name: Mapped[str]
file: Mapped[FileObject] = mapped_column(StoredObject(backend="private"))
@property
def url(self) -> str:
"""Generate signed URL for file access."""
return await self.file.sign_async(expires_in=3600)
Production Configuration¶
For production MinIO deployments:
# Production settings
MINIO_ENDPOINT = "https://minio.example.com"
MINIO_ACCESS_KEY = "production-access-key" # Use secrets manager
MINIO_SECRET_KEY = "production-secret-key" # Use secrets manager
storages.register_backend(ObstoreBackend(
key="default",
fs="s3://production-bucket/",
aws_access_key_id=MINIO_ACCESS_KEY,
aws_secret_access_key=MINIO_SECRET_KEY,
aws_endpoint=MINIO_ENDPOINT,
aws_region="us-east-1",
aws_allow_http=False, # HTTPS in production
))
Framework Integration¶
Litestar Lifespan¶
from contextlib import asynccontextmanager
from litestar import Litestar
@asynccontextmanager
async def storage_lifespan(app: Litestar):
"""Configure storage on application startup."""
configure_storage_from_env()
yield
# Cleanup if needed
app = Litestar(
route_handlers=[...],
lifespan=[storage_lifespan],
)
FastAPI Lifespan¶
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Configure storage on application startup."""
configure_storage_from_env()
yield
app = FastAPI(lifespan=lifespan)
Flask Application Factory¶
from flask import Flask
def create_app(config_name: str = "development"):
"""Flask application factory."""
app = Flask(__name__)
# Configure storage
with app.app_context():
configure_storage_from_env()
return app
Multiple Backend Strategies¶
Backend per Use Case¶
from advanced_alchemy.types.file_object.backends.obstore import ObstoreBackend
from advanced_alchemy.types.file_object.backends.fsspec import FSSpecBackend
def configure_multiple_backends():
"""Configure different backends for different use cases."""
# User uploads on S3
storages.register_backend(ObstoreBackend(
key="user-uploads",
fs="s3://user-uploads/",
aws_region="us-west-2",
))
# Product images on GCS
storages.register_backend(ObstoreBackend(
key="product-images",
fs="gs://product-images/",
))
# Documents on Azure
storages.register_backend(ObstoreBackend(
key="documents",
fs="az://documents/",
azure_storage_connection_string=os.environ["AZURE_STORAGE_CONNECTION_STRING"],
))
# Temporary files locally
storages.register_backend(FSSpecBackend(
key="temp",
fs="file",
prefix="/tmp/uploads",
))
Backend per Environment¶
def configure_environment_backends(environment: str):
"""Configure backends based on environment."""
if environment == "production":
# Production: cloud storage
storages.register_backend(ObstoreBackend(
key="default",
fs="s3://production-uploads/",
aws_region="us-west-2",
))
elif environment == "staging":
# Staging: separate bucket
storages.register_backend(ObstoreBackend(
key="default",
fs="s3://staging-uploads/",
aws_region="us-west-2",
))
else:
# Development/testing: local storage
from advanced_alchemy.types.file_object.backends.fsspec import FSSpecBackend
storages.register_backend(FSSpecBackend(
key="default",
fs="file",
prefix="/tmp/dev-uploads",
))
Security Configuration¶
Credential Management¶
# Environment variables (recommended)
AWS_ACCESS_KEY_ID=your-access-key
AWS_SECRET_ACCESS_KEY=your-secret-key
# AWS credentials file (~/.aws/credentials)
[default]
aws_access_key_id = your-access-key
aws_secret_access_key = your-secret-key
# IAM roles (EC2, ECS, Lambda)
# No credentials needed - automatically provided
Secrets Management¶
# AWS Secrets Manager
import boto3
import json
def get_storage_credentials():
"""Retrieve credentials from AWS Secrets Manager."""
client = boto3.client("secretsmanager", region_name="us-west-2")
response = client.get_secret_value(SecretId="storage-credentials")
return json.loads(response["SecretString"])
credentials = get_storage_credentials()
storages.register_backend(ObstoreBackend(
key="s3",
fs="s3://my-bucket/",
aws_access_key_id=credentials["access_key_id"],
aws_secret_access_key=credentials["secret_access_key"],
aws_region="us-west-2",
))
Encryption at Rest¶
# S3 server-side encryption
import fsspec
s3_fs = fsspec.filesystem(
"s3",
key="AWS_ACCESS_KEY_ID",
secret="AWS_SECRET_ACCESS_KEY",
s3_additional_kwargs={
"ServerSideEncryption": "AES256",
},
)
from advanced_alchemy.types.file_object.backends.fsspec import FSSpecBackend
storages.register_backend(FSSpecBackend(
key="s3-encrypted",
fs=s3_fs,
prefix="my-bucket",
))
CORS Configuration¶
S3 CORS for Direct Upload¶
{
"CORSRules": [
{
"AllowedOrigins": ["https://app.example.com"],
"AllowedMethods": ["GET", "PUT", "POST"],
"AllowedHeaders": ["*"],
"ExposeHeaders": ["ETag"],
"MaxAgeSeconds": 3000
}
]
}
Signed URL Pattern¶
from litestar import post
from advanced_alchemy.types import FileObject
@post("/upload-url")
async def generate_upload_url(
filename: str,
content_type: str,
) -> "dict[str, str]":
"""Generate signed upload URL with CORS support."""
file_obj = FileObject(
backend="s3",
filename=filename,
content_type=content_type,
)
upload_url = await file_obj.sign_async(expires_in=300, for_upload=True)
return {
"upload_url": upload_url,
"filename": filename,
"content_type": content_type,
"expires_in": 300,
}
Performance Optimization¶
Connection Pooling¶
import fsspec
# fsspec: configure connection pool
s3_fs = fsspec.filesystem(
"s3",
key="AWS_ACCESS_KEY_ID",
secret="AWS_SECRET_ACCESS_KEY",
config_kwargs={
"max_pool_connections": 50,
"connect_timeout": 60,
"read_timeout": 60,
},
)
Caching¶
import fsspec
# Cache remote files locally
s3_fs = fsspec.filesystem(
"s3",
key="AWS_ACCESS_KEY_ID",
secret="AWS_SECRET_ACCESS_KEY",
)
cached_fs = fsspec.filesystem(
"filecache",
target_protocol="s3",
cache_storage="/tmp/fsspec_cache",
fs=s3_fs,
)
from advanced_alchemy.types.file_object.backends.fsspec import FSSpecBackend
storages.register_backend(FSSpecBackend(
key="s3-cached",
fs=cached_fs,
prefix="my-bucket",
))
Multipart Configuration¶
# obstore: configure multipart thresholds
from advanced_alchemy.types import FileObject
# Large files: increase chunk size
large_file = FileObject(
backend="s3",
filename="video.mp4",
content=video_bytes,
)
await large_file.save_async(
chunk_size=50 * 1024 * 1024, # 50 MB chunks
max_concurrency=20,
)
Monitoring and Logging¶
Storage Metrics¶
import logging
from advanced_alchemy.types import FileObject
logger = logging.getLogger(__name__)
async def upload_with_metrics(file_obj: FileObject):
"""Upload file with metrics logging."""
start = time.time()
try:
await file_obj.save_async()
duration = time.time() - start
logger.info(
"file uploaded",
extra={
"filename": file_obj.filename,
"size": file_obj.size,
"backend": file_obj.backend,
"duration_ms": duration * 1000,
}
)
except Exception as e:
logger.error(
"file upload failed",
extra={
"filename": file_obj.filename,
"backend": file_obj.backend,
"error": str(e),
}
)
raise
Error Handling¶
from advanced_alchemy.types import FileObject
async def safe_upload(file_obj: FileObject, max_retries: int = 3):
"""Upload file with retry logic."""
for attempt in range(max_retries):
try:
await file_obj.save_async()
return
except Exception as e:
if attempt == max_retries - 1:
raise
logger.warning(
f"upload attempt {attempt + 1} failed, retrying",
extra={"error": str(e)}
)
await asyncio.sleep(2 ** attempt) # Exponential backoff
Testing Configuration¶
Test Fixtures¶
import pytest
from advanced_alchemy.types.file_object import storages
from advanced_alchemy.types.file_object.backends.fsspec import FSSpecBackend
@pytest.fixture(autouse=True)
def configure_test_storage():
"""Configure memory storage for all tests."""
backend = FSSpecBackend(key="test", fs="memory")
storages.register_backend(backend)
yield
storages._backends.clear()
Environment-Specific Tests¶
import pytest
import os
@pytest.fixture
def configure_storage_for_environment():
"""Configure storage based on test environment."""
if os.environ.get("USE_REAL_S3") == "true":
# Integration tests with real S3
from advanced_alchemy.types.file_object.backends.obstore import ObstoreBackend
storages.register_backend(ObstoreBackend(
key="test",
fs="s3://test-bucket/",
aws_region="us-west-2",
))
else:
# Unit tests with memory storage
from advanced_alchemy.types.file_object.backends.fsspec import FSSpecBackend
storages.register_backend(FSSpecBackend(
key="test",
fs="memory"
))
yield
storages._backends.clear()
Migration Strategies¶
Gradual Migration¶
# Register both old and new backends
def configure_migration_backends():
"""Configure both local and S3 for gradual migration."""
# Old backend (local)
from advanced_alchemy.types.file_object.backends.fsspec import FSSpecBackend
storages.register_backend(FSSpecBackend(
key="local-legacy",
fs="file",
prefix="/var/app/uploads",
))
# New backend (S3)
from advanced_alchemy.types.file_object.backends.obstore import ObstoreBackend
storages.register_backend(ObstoreBackend(
key="s3-new",
fs="s3://new-uploads/",
aws_region="us-west-2",
))
# Use feature flag or gradual rollout
def get_storage_backend(user_id: UUID) -> str:
"""Determine storage backend for user."""
if is_migrated_user(user_id):
return "s3-new"
return "local-legacy"
Data Migration Script¶
async def migrate_files_to_s3():
"""Migrate files from local to S3."""
from sqlalchemy import select
from advanced_alchemy.types import FileObject
stmt = select(Document).where(
Document.file.isnot(None)
)
result = await session.execute(stmt)
documents = list(result.scalars())
for doc in documents:
if doc.file.backend == "local-legacy":
# Get file content
content = await doc.file.get_content_async()
# Create new file on S3
new_file = FileObject(
backend="s3-new",
filename=doc.file.filename,
content_type=doc.file.content_type,
metadata=doc.file.metadata,
content=content,
)
# Save to S3
await new_file.save_async()
# Delete old file
await doc.file.delete_async()
# Update document
doc.file = new_file
await session.commit()
logger.info(f"migrated file: {doc.file.filename}")
See Also¶
Storage Backends - Storage backend overview
FSSpec Backend - FSSpec backend details
Obstore Backend - Obstore backend details
File Storage - FileObject type documentation