Security Types¶
SQLAlchemy types for handling sensitive data with encryption and password hashing.
EncryptedString¶
Type for storing encrypted string values with configurable encryption backends.
Characteristics:
Python type:
strStorage: Encrypted VARCHAR/VARCHAR2
Automatic encryption/decryption
Configurable encryption backend
Configurable maximum length
Basic Usage¶
from sqlalchemy.orm import Mapped, mapped_column
from advanced_alchemy.base import UUIDBase
from advanced_alchemy.types import EncryptedString
class User(UUIDBase):
__tablename__ = "users"
email: "Mapped[str]" = mapped_column(unique=True)
api_key: "Mapped[str]" = mapped_column(
EncryptedString(key="your-encryption-key")
)
ssn: "Mapped[str]" = mapped_column(
EncryptedString(key="your-encryption-key", length=255)
)
Storing Encrypted Data¶
from sqlalchemy.ext.asyncio import AsyncSession
async def create_user(session: AsyncSession) -> User:
user = User(
email="user@example.com",
api_key="sk_live_abc123xyz", # Encrypted automatically
)
session.add(user)
await session.commit()
return user
Retrieving Decrypted Data¶
async def get_api_key(session: AsyncSession, user_id: UUID) -> str:
stmt = select(User).where(User.id == user_id)
result = await session.execute(stmt)
user = result.scalar_one()
return user.api_key # Decrypted automatically
EncryptedText¶
Type for storing larger encrypted text content (CLOB/TEXT).
Characteristics:
Python type:
strStorage: Encrypted TEXT/CLOB
Automatic encryption/decryption
No length limit (database-dependent)
Basic Usage¶
from sqlalchemy.orm import Mapped, mapped_column
from advanced_alchemy.base import UUIDBase
from advanced_alchemy.types import EncryptedText
class SecureDocument(UUIDBase):
__tablename__ = "secure_documents"
title: "Mapped[str]"
content: "Mapped[str]" = mapped_column(
EncryptedText(key="your-encryption-key")
)
Storing Large Encrypted Content¶
async def create_document(session: AsyncSession, content: str) -> SecureDocument:
document = SecureDocument(
title="Confidential Report",
content=content, # Large text encrypted automatically
)
session.add(document)
await session.commit()
return document
Encryption Backends¶
Two encryption backends are available for EncryptedString and EncryptedText.
FernetBackend (Default)¶
Uses Python’s cryptography library with Fernet encryption.
Characteristics:
Implementation: AES-128 in CBC mode with HMAC
Key format: 32 URL-safe base64-encoded bytes
Platform: Pure Python, works on all databases
from advanced_alchemy.types import EncryptedString
from advanced_alchemy.types.encrypted_string import FernetBackend
# Explicit backend (default)
api_key: "Mapped[str]" = mapped_column(
EncryptedString(
key="your-encryption-key",
backend=FernetBackend,
)
)
Generating Fernet Keys¶
from cryptography.fernet import Fernet
# Generate a new key
encryption_key = Fernet.generate_key()
print(encryption_key) # b'...' (32 bytes, base64-encoded)
# Store securely (environment variable, key management service)
import os
os.environ["ENCRYPTION_KEY"] = encryption_key.decode()
PGCryptoBackend¶
Uses PostgreSQL’s pgcrypto extension for database-side encryption.
Characteristics:
Implementation: PostgreSQL pgcrypto extension
Encryption: Server-side (within PostgreSQL)
Platform: PostgreSQL only
Requirement: pgcrypto extension enabled
from advanced_alchemy.types import EncryptedString
from advanced_alchemy.types.encrypted_string import PGCryptoBackend
# PostgreSQL pgcrypto backend
api_key: "Mapped[str]" = mapped_column(
EncryptedString(
key="your-encryption-key",
backend=PGCryptoBackend,
)
)
Enabling pgcrypto Extension¶
-- PostgreSQL setup
CREATE EXTENSION IF NOT EXISTS pgcrypto;
PasswordHash¶
Type for storing password hashes with automatic hashing and verification.
Characteristics:
Python type:
str(plaintext) orHashedPasswordStorage: Hashed string (VARCHAR/TEXT)
Automatic hashing on assignment
Verification support
Configurable hashing backend
Basic Usage¶
from sqlalchemy.orm import Mapped, mapped_column
from advanced_alchemy.base import UUIDBase
from advanced_alchemy.types import PasswordHash
class User(UUIDBase):
__tablename__ = "users"
email: "Mapped[str]" = mapped_column(unique=True)
password: "Mapped[str]" = mapped_column(PasswordHash)
Storing Passwords¶
async def create_user(session: AsyncSession, email: str, password: str) -> User:
user = User(
email=email,
password=password, # Hashed automatically
)
session.add(user)
await session.commit()
return user
# Password is now hashed in database
# user.password contains hash, not plaintext
Verifying Passwords¶
from advanced_alchemy.types.password_hash import HashedPassword
async def verify_login(
session: AsyncSession,
email: str,
password: str
) -> "Optional[User]":
stmt = select(User).where(User.email == email)
result = await session.execute(stmt)
user = result.scalar_one_or_none()
if user is None:
return None
# Verify password
hashed = HashedPassword(user.password)
if hashed.verify(password):
return user
return None
Password Hashing Backends¶
Three password hashing backends are available.
PwdlibHasher (Default)¶
Uses pwdlib library with configurable hashers.
Characteristics:
Implementation: pwdlib (modern password hashing)
Default algorithm: Argon2id
Platform: Pure Python
Installation:
pip install "advanced-alchemy[pwdlib]"
from advanced_alchemy.types import PasswordHash
from advanced_alchemy.types.password_hash.pwdlib import PwdlibHasher
from pwdlib.hashers.argon2 import Argon2Hasher
# Default (Argon2)
password: "Mapped[str]" = mapped_column(PasswordHash)
# Explicit configuration
password: "Mapped[str]" = mapped_column(
PasswordHash(
backend=PwdlibHasher(hasher=Argon2Hasher())
)
)
Argon2Hasher¶
Uses argon2-cffi for Argon2 password hashing.
Characteristics:
Implementation: argon2-cffi (Argon2id)
Algorithm: Argon2id (memory-hard)
Platform: C extension with Python fallback
Installation:
pip install "advanced-alchemy[argon2]"
from advanced_alchemy.types import PasswordHash
from advanced_alchemy.types.password_hash.argon2 import Argon2Hasher
password: "Mapped[str]" = mapped_column(
PasswordHash(backend=Argon2Hasher())
)
PasslibHasher¶
Uses passlib for flexible password hashing.
Characteristics:
Implementation: passlib (legacy support)
Default algorithm: bcrypt
Platform: Pure Python with optional C extensions
Installation:
pip install "advanced-alchemy[passlib]"
from advanced_alchemy.types import PasswordHash
from advanced_alchemy.types.password_hash.passlib import PasslibHasher
password: "Mapped[str]" = mapped_column(
PasswordHash(backend=PasslibHasher())
)
Security Considerations¶
Key Management¶
Encryption keys must be stored securely:
import os
# Environment variable (recommended for deployment)
encryption_key = os.environ["ENCRYPTION_KEY"]
# Key management service (AWS KMS, Google Cloud KMS, Azure Key Vault)
from your_kms import get_encryption_key
encryption_key = get_encryption_key("user-data-encryption")
class User(UUIDBase):
__tablename__ = "users"
api_key: "Mapped[str]" = mapped_column(
EncryptedString(key=encryption_key)
)
Key Rotation¶
Rotating encryption keys requires re-encrypting data:
from advanced_alchemy.types.encrypted_string import FernetBackend
async def rotate_encryption_key(
session: AsyncSession,
old_key: str,
new_key: str
) -> None:
# Define temporary model with old key
old_backend = FernetBackend(key=old_key)
new_backend = FernetBackend(key=new_key)
# Fetch all users
stmt = select(User)
result = await session.execute(stmt)
users = list(result.scalars())
for user in users:
# Decrypt with old key
decrypted = old_backend.decrypt(user.api_key)
# Encrypt with new key
user.api_key = new_backend.encrypt(decrypted)
await session.commit()
Password Policy¶
Enforce password requirements at application level:
import re
def validate_password(password: str) -> bool:
"""Validate password meets security requirements."""
if len(password) < 12:
return False
if not re.search(r"[A-Z]", password):
return False
if not re.search(r"[a-z]", password):
return False
if not re.search(r"[0-9]", password):
return False
if not re.search(r"[!@#$%^&*]", password):
return False
return True
async def create_user(
session: AsyncSession,
email: str,
password: str
) -> User:
if not validate_password(password):
raise ValueError("password does not meet security requirements")
user = User(email=email, password=password)
session.add(user)
await session.commit()
return user
Common Patterns¶
User Authentication¶
from datetime import datetime, timezone
class User(UUIDAuditBase):
__tablename__ = "users"
email: "Mapped[str]" = mapped_column(unique=True)
password: "Mapped[str]" = mapped_column(PasswordHash)
last_login: "Mapped[Optional[datetime]]" = mapped_column(DateTimeUTC)
async def authenticate(
session: AsyncSession,
email: str,
password: str
) -> "Optional[User]":
stmt = select(User).where(User.email == email)
result = await session.execute(stmt)
user = result.scalar_one_or_none()
if user is None:
return None
hashed = HashedPassword(user.password)
if not hashed.verify(password):
return None
# Update last login
user.last_login = datetime.now(timezone.utc)
await session.commit()
return user
from datetime import datetime, UTC
class User(UUIDAuditBase):
__tablename__ = "users"
email: "Mapped[str]" = mapped_column(unique=True)
password: "Mapped[str]" = mapped_column(PasswordHash)
last_login: "Mapped[Optional[datetime]]" = mapped_column(DateTimeUTC)
async def authenticate(
session: AsyncSession,
email: str,
password: str
) -> "Optional[User]":
stmt = select(User).where(User.email == email)
result = await session.execute(stmt)
user = result.scalar_one_or_none()
if user is None:
return None
hashed = HashedPassword(user.password)
if not hashed.verify(password):
return None
# Update last login
user.last_login = datetime.now(UTC)
await session.commit()
return user
API Key Management¶
import secrets
class APIKey(UUIDAuditBase):
__tablename__ = "api_keys"
user_id: "Mapped[UUID]" = mapped_column(GUID)
name: "Mapped[str]" # "Production API", "Development API"
key: "Mapped[str]" = mapped_column(EncryptedString(key=ENCRYPTION_KEY))
last_used: "Mapped[Optional[datetime]]" = mapped_column(DateTimeUTC)
async def create_api_key(
session: AsyncSession,
user_id: UUID,
name: str
) -> tuple[APIKey, str]:
# Generate secure random key
key_value = secrets.token_urlsafe(32)
api_key = APIKey(
user_id=user_id,
name=name,
key=key_value,
)
session.add(api_key)
await session.commit()
# Return key to user (only time they'll see it)
return api_key, key_value
async def verify_api_key(session: AsyncSession, key: str) -> "Optional[APIKey]":
stmt = select(APIKey)
result = await session.execute(stmt)
api_keys = list(result.scalars())
for api_key in api_keys:
if api_key.key == key:
# Update last used
api_key.last_used = datetime.now(timezone.utc)
await session.commit()
return api_key
return None
See Also¶
Basic Types - Core SQLAlchemy types
Modeling - Base class integration
types - Complete API reference
Cryptography Documentation - Fernet encryption
pwdlib Documentation - Password hashing