FSSpec Backend¶
Python-based storage backend with broad filesystem support through the fsspec ecosystem.
Characteristics¶
Implementation: Pure Python
Async Support: Via fsspec.asyn wrappers
Supported Backends: 60+ filesystem implementations
Installation:
pip install "advanced-alchemy[fsspec]"
Supported Filesystems¶
fsspec provides implementations for:
Cloud Storage:
Amazon S3 (s3fs)
Google Cloud Storage (gcsfs)
Azure Blob Storage (adlfs)
Dropbox (dropboxdrivefs)
Network Protocols:
SFTP (sshfs)
FTP (ftpfs)
HTTP/HTTPS (http)
WebDAV (webdavfs)
Local & Other:
Local filesystem (file)
In-memory (memory)
GitHub (github)
Archive files (zip, tar)
See fsspec implementations for complete list.
Installation¶
Basic Installation¶
pip install "advanced-alchemy[fsspec]"
Cloud Provider Dependencies¶
# Amazon S3
pip install s3fs
# Google Cloud Storage
pip install gcsfs
# Azure Blob Storage
pip install adlfs
# SFTP
pip install sshfs
Basic Usage¶
Backend Registration¶
from advanced_alchemy.types.file_object import storages
from advanced_alchemy.types.file_object.backends.fsspec import FSSpecBackend
# Register backend
storages.register_backend(FSSpecBackend(
key="local",
fs="file",
prefix="/var/app/uploads",
))
Using in Models¶
from sqlalchemy.orm import Mapped, mapped_column
from advanced_alchemy.base import UUIDAuditBase
from advanced_alchemy.types import FileObject, StoredObject
class Document(UUIDAuditBase):
__tablename__ = "documents"
title: "Mapped[str]"
file: "Mapped[Optional[FileObject]]" = mapped_column(
StoredObject(backend="local")
)
Local Filesystem¶
Basic Setup¶
from advanced_alchemy.types.file_object.backends.fsspec import FSSpecBackend
storages.register_backend(FSSpecBackend(
key="local",
fs="file",
prefix="/var/app/uploads",
))
Auto-Create Directories¶
import fsspec
fs = fsspec.filesystem("file", auto_mkdir=True)
storages.register_backend(FSSpecBackend(
key="local",
fs=fs,
prefix="/var/app/uploads",
))
Amazon S3¶
Basic Configuration¶
import fsspec
from advanced_alchemy.types.file_object.backends.fsspec import FSSpecBackend
s3_fs = fsspec.filesystem(
"s3",
key="AWS_ACCESS_KEY_ID",
secret="AWS_SECRET_ACCESS_KEY",
endpoint_url="https://s3.amazonaws.com",
)
storages.register_backend(FSSpecBackend(
key="s3-documents",
fs=s3_fs,
prefix="my-bucket/documents",
))
IAM Role Authentication¶
# Use IAM role (EC2, ECS, Lambda)
s3_fs = fsspec.filesystem("s3")
storages.register_backend(FSSpecBackend(
key="s3-documents",
fs=s3_fs,
prefix="my-bucket/documents",
))
S3-Compatible Services¶
MinIO, DigitalOcean Spaces, Cloudflare R2:
# MinIO
minio_fs = fsspec.filesystem(
"s3",
key="minioadmin",
secret="minioadmin",
endpoint_url="http://localhost:9000",
use_ssl=False,
)
storages.register_backend(FSSpecBackend(
key="minio",
fs=minio_fs,
prefix="my-bucket",
))
# Cloudflare R2
r2_fs = fsspec.filesystem(
"s3",
key="R2_ACCESS_KEY_ID",
secret="R2_SECRET_ACCESS_KEY",
endpoint_url="https://account-id.r2.cloudflarestorage.com",
)
storages.register_backend(FSSpecBackend(
key="r2",
fs=r2_fs,
prefix="my-bucket",
))
Google Cloud Storage¶
Service Account¶
import fsspec
from advanced_alchemy.types.file_object.backends.fsspec import FSSpecBackend
gcs_fs = fsspec.filesystem(
"gcs",
token="/path/to/service-account.json",
project="your-project-id",
)
storages.register_backend(FSSpecBackend(
key="gcs-files",
fs=gcs_fs,
prefix="my-bucket/files",
))
Default Credentials¶
# Use application default credentials
gcs_fs = fsspec.filesystem("gcs", token="google_default")
storages.register_backend(FSSpecBackend(
key="gcs-files",
fs=gcs_fs,
prefix="my-bucket/files",
))
Azure Blob Storage¶
Connection String¶
import fsspec
from advanced_alchemy.types.file_object.backends.fsspec import FSSpecBackend
azure_fs = fsspec.filesystem(
"abfs",
connection_string="DefaultEndpointsProtocol=https;AccountName=...;AccountKey=...;EndpointSuffix=core.windows.net",
)
storages.register_backend(FSSpecBackend(
key="azure-blobs",
fs=azure_fs,
prefix="container/files",
))
Account Key¶
azure_fs = fsspec.filesystem(
"abfs",
account_name="mystorageaccount",
account_key="account-key-here",
)
storages.register_backend(FSSpecBackend(
key="azure-blobs",
fs=azure_fs,
prefix="container/files",
))
SFTP¶
Password Authentication¶
import fsspec
from advanced_alchemy.types.file_object.backends.fsspec import FSSpecBackend
sftp_fs = fsspec.filesystem(
"sftp",
host="sftp.example.com",
username="user",
password="password",
)
storages.register_backend(FSSpecBackend(
key="sftp-uploads",
fs=sftp_fs,
prefix="/remote/path",
))
SSH Key Authentication¶
sftp_fs = fsspec.filesystem(
"sftp",
host="sftp.example.com",
username="user",
client_keys=["/path/to/private_key"],
)
storages.register_backend(FSSpecBackend(
key="sftp-uploads",
fs=sftp_fs,
prefix="/remote/path",
))
HTTP/HTTPS¶
Public Files¶
import fsspec
from advanced_alchemy.types.file_object.backends.fsspec import FSSpecBackend
http_fs = fsspec.filesystem("http")
storages.register_backend(FSSpecBackend(
key="cdn",
fs=http_fs,
prefix="https://cdn.example.com/files",
))
Authenticated¶
http_fs = fsspec.filesystem(
"http",
client_kwargs={"headers": {"Authorization": "Bearer token"}},
)
storages.register_backend(FSSpecBackend(
key="api-storage",
fs=http_fs,
prefix="https://api.example.com/storage",
))
Advanced Configuration¶
Custom fsspec Options¶
import fsspec
# S3 with custom configuration
s3_fs = fsspec.filesystem(
"s3",
key="AWS_ACCESS_KEY_ID",
secret="AWS_SECRET_ACCESS_KEY",
config_kwargs={
"max_pool_connections": 50,
"connect_timeout": 60,
"read_timeout": 60,
},
use_ssl=True,
s3_additional_kwargs={
"ServerSideEncryption": "AES256",
},
)
storages.register_backend(FSSpecBackend(
key="s3-encrypted",
fs=s3_fs,
prefix="my-bucket/encrypted",
))
Caching¶
fsspec supports caching for remote filesystems:
import fsspec
# Cache remote files locally
s3_fs = fsspec.filesystem(
"s3",
key="AWS_ACCESS_KEY_ID",
secret="AWS_SECRET_ACCESS_KEY",
)
cached_fs = fsspec.filesystem(
"filecache",
target_protocol="s3",
cache_storage="/tmp/fsspec_cache",
fs=s3_fs,
)
storages.register_backend(FSSpecBackend(
key="s3-cached",
fs=cached_fs,
prefix="my-bucket",
))
File Operations¶
Upload Pattern¶
from litestar import post
from litestar.datastructures import UploadFile
from advanced_alchemy.types import FileObject
@post("/upload")
async def upload_file(
data: UploadFile,
service: "DocumentService",
) -> "Document":
"""Upload file to fsspec storage."""
doc = await service.create(
DocumentModel(
title=data.filename or "untitled",
file=FileObject(
backend="s3-documents",
filename=data.filename or "file",
content_type=data.content_type,
content=await data.read(),
),
)
)
return service.to_schema(doc, schema_type=DocumentSchema)
Download Pattern¶
from litestar import get
from litestar.response import Stream
@get("/download/{document_id:uuid}")
async def download_file(
document_id: UUID,
service: "DocumentService",
) -> Stream:
"""Download file from fsspec storage."""
doc = await service.get(document_id)
if doc.file is None:
raise NotFoundException("file not found")
content = await doc.file.get_content_async()
return Stream(
content=content,
media_type=doc.file.content_type or "application/octet-stream",
headers={
"Content-Disposition": f'attachment; filename="{doc.file.filename}"'
},
)
Testing¶
In-Memory Backend¶
import pytest
from advanced_alchemy.types.file_object import storages
from advanced_alchemy.types.file_object.backends.fsspec import FSSpecBackend
@pytest.fixture
def memory_storage():
"""Configure in-memory fsspec storage."""
backend = FSSpecBackend(key="test", fs="memory")
storages.register_backend(backend)
yield backend
storages._backends.pop("test", None)
async def test_file_upload(memory_storage):
"""Test file upload with in-memory fsspec."""
from advanced_alchemy.types import FileObject
file_obj = FileObject(
backend="test",
filename="test.txt",
content=b"Test content",
)
await file_obj.save_async()
content = await file_obj.get_content_async()
assert content == b"Test content"
Mock S3 (moto)¶
import pytest
from moto import mock_aws
import fsspec
from advanced_alchemy.types.file_object.backends.fsspec import FSSpecBackend
@pytest.fixture
def mock_s3():
"""Mock S3 for testing."""
with mock_aws():
# Create bucket
import boto3
s3 = boto3.client("s3", region_name="us-east-1")
s3.create_bucket(Bucket="test-bucket")
# Register fsspec backend
s3_fs = fsspec.filesystem("s3")
storages.register_backend(FSSpecBackend(
key="test-s3",
fs=s3_fs,
prefix="test-bucket",
))
yield
storages._backends.pop("test-s3", None)
Performance Considerations¶
Buffering¶
fsspec uses buffering for remote filesystems:
import fsspec
# Increase buffer size for large files
s3_fs = fsspec.filesystem(
"s3",
key="AWS_ACCESS_KEY_ID",
secret="AWS_SECRET_ACCESS_KEY",
default_block_size=10 * 1024 * 1024, # 10 MB blocks
)
Async Operations¶
fsspec provides async wrappers:
# FileObject automatically uses async when available
content = await file_obj.get_content_async() # Uses fsspec async
Connection Pooling¶
Configure connection pools for better performance:
import fsspec
s3_fs = fsspec.filesystem(
"s3",
key="AWS_ACCESS_KEY_ID",
secret="AWS_SECRET_ACCESS_KEY",
config_kwargs={"max_pool_connections": 50},
)
Common Issues¶
Import Errors¶
Missing filesystem-specific packages:
# Error: No module named 's3fs'
pip install s3fs
# Error: No module named 'gcsfs'
pip install gcsfs
# Error: No module named 'adlfs'
pip install adlfs
Path Issues¶
Ensure correct path format:
# Correct
prefix="bucket/path" # No leading slash
prefix="/local/absolute/path" # Absolute for local
# Incorrect
prefix="/bucket/path" # Leading slash for cloud
prefix="local/relative/path" # Relative for local (use absolute)
Authentication Failures¶
Verify credentials and permissions:
# Test filesystem directly
import fsspec
fs = fsspec.filesystem(
"s3",
key="AWS_ACCESS_KEY_ID",
secret="AWS_SECRET_ACCESS_KEY",
)
# List bucket contents
files = fs.ls("my-bucket")
print(files)
Migration from Other Backends¶
From Local to S3¶
# Before (local)
storages.register_backend(FSSpecBackend(
key="files",
fs="file",
prefix="/var/app/uploads",
))
# After (S3)
import fsspec
s3_fs = fsspec.filesystem("s3", key="...", secret="...")
storages.register_backend(FSSpecBackend(
key="files",
fs=s3_fs,
prefix="my-bucket/uploads",
))
# Models unchanged - only backend registration changes
See Also¶
Storage Backends - Storage backend overview
Obstore Backend - Rust-based alternative backend
Storage Configuration - Advanced configuration
File Storage - FileObject type