1. ORM vs SQL pur
SQLAlchemy propose deux approches complémentaires : l'ORM (Object-Relational Mapping) qui travaille avec des objets Python, et Core qui travaille avec des expressions SQL constructibles.
| Critère | ORM | SQL Core / pur |
|---|---|---|
| Niveau d'abstraction | Haut — objets Python | Bas — expressions SQL |
| Lisibilité | Très lisible | Plus verbeux |
| Performance | Overhead faible | Optimal |
| Complexité | Simple pour le CRUD | Puissant pour requêtes complexes |
| Migration | Alembic intégré | Manuel ou Alembic |
| Recommandé pour | Applications CRUD | Reporting, analytics |
from __future__ import annotations
# ── ORM : travailler avec des objets
from sqlalchemy.orm import Session
def get_user_orm(db: Session, user_id: int):
return db.get(User, user_id) # User = classe Python
# ── Core : construire du SQL programmatiquement
from sqlalchemy import select, text
def get_user_core(db: Session, user_id: int):
stmt = select(User).where(User.id == user_id)
return db.execute(stmt).scalar_one_or_none()
# ── SQL brut (quand vraiment nécessaire)
def count_tasks_raw(db: Session) -> int:
result = db.execute(text("SELECT COUNT(*) FROM tasks"))
return result.scalar()
2. Engine & Session
L'Engine gère le pool de connexions. La Session est l'unité de travail — elle suit les changements et les écrit en DB lors du commit.
from __future__ import annotations
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase
# ── Création de l'engine (SQLite pour le dev, PostgreSQL en prod)
DATABASE_URL = "sqlite:///./taskapi.db"
# DATABASE_URL = "postgresql+psycopg2://user:pass@localhost/taskapi"
engine = create_engine(
DATABASE_URL,
# SQLite seulement : autorise l'accès multi-thread
connect_args={"check_same_thread": False},
# Pool de connexions (PostgreSQL)
# pool_size=5, max_overflow=10
echo=False, # True = affiche les requêtes SQL en console
)
# ── Factory de sessions
SessionLocal = sessionmaker(
bind=engine,
autocommit=False, # ← toujours False : gérer les commits manuellement
autoflush=False, # ← ne pas écrire avant le commit
)
# ── Base déclarative (partagée par tous les modèles)
class Base(DeclarativeBase):
pass
# ── Dépendance FastAPI : fournit une session par requête
def get_db():
db = SessionLocal()
try:
yield db # la session est disponible dans la route
finally:
db.close() # toujours fermée, même en cas d'erreur
# ── Créer les tables (dev uniquement, utiliser Alembic en prod)
# Base.metadata.create_all(bind=engine)
3. Modèles déclaratifs (style 2.x)
SQLAlchemy 2.x introduit les Mapped annotations pour un typage statique fort et une meilleure intégration avec mypy/pyright.
from __future__ import annotations
from datetime import datetime
from typing import Optional
from sqlalchemy import String, Text, ForeignKey, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
# Mapped[type] = colonne avec annotation de type
id: Mapped[int] = mapped_column(primary_key=True, index=True)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True, nullable=False)
username: Mapped[str] = mapped_column(String(50), unique=True, nullable=False)
hashed_password: Mapped[str] = mapped_column(String(255), nullable=False)
is_active: Mapped[bool] = mapped_column(default=True)
created_at: Mapped[datetime] = mapped_column(default=func.now())
# Relation inverse (back_populates)
projects: Mapped[list[Project]] = relationship("Project", back_populates="owner")
tasks: Mapped[list[Task]] = relationship("Task", back_populates="assignee")
def __repr__(self) -> str:
return f"<User id={self.id} email={self.email!r}>"
class Project(Base):
__tablename__ = "projects"
id: Mapped[int] = mapped_column(primary_key=True, index=True)
name: Mapped[str] = mapped_column(String(200), nullable=False)
description: Mapped[Optional[str]] = mapped_column(Text)
owner_id: Mapped[int] = mapped_column(ForeignKey("users.id"), nullable=False)
created_at: Mapped[datetime] = mapped_column(default=func.now())
owner: Mapped[User] = relationship("User", back_populates="projects")
tasks: Mapped[list[Task]] = relationship("Task", back_populates="project", cascade="all, delete-orphan")
4. Relations
SQLAlchemy gère automatiquement les JOIN via relationship(). Choisissez la stratégie de chargement selon vos besoins.
from __future__ import annotations
from sqlalchemy import Column, ForeignKey, Integer, String, Table
from sqlalchemy.orm import relationship, selectinload, joinedload
# ── Many-to-Many : table d'association
task_tags = Table(
"task_tags",
Base.metadata,
Column("task_id", Integer, ForeignKey("tasks.id"), primary_key=True),
Column("tag_id", Integer, ForeignKey("tags.id"), primary_key=True),
)
class Task(Base):
__tablename__ = "tasks"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
project_id: Mapped[int] = mapped_column(ForeignKey("projects.id"))
assignee_id: Mapped[int | None] = mapped_column(ForeignKey("users.id"))
# Lazy loading (défaut) — chargé à la première utilisation
project: Mapped[Project] = relationship("Project", back_populates="tasks")
assignee: Mapped[User | None] = relationship("User", back_populates="tasks")
# Many-to-Many via table d'association
tags: Mapped[list[Tag]] = relationship("Tag", secondary=task_tags, back_populates="tasks")
# ── Stratégies de chargement (dans les queries)
from sqlalchemy.orm import Session
def get_tasks_with_project(db: Session):
# joinedload : JOIN SQL unique — idéal pour n-à-1
return db.execute(
select(Task).options(joinedload(Task.project))
).unique().scalars().all()
def get_tasks_with_tags(db: Session):
# selectinload : SELECT séparé — idéal pour 1-à-n et n-à-n
return db.execute(
select(Task).options(selectinload(Task.tags))
).unique().scalars().all()
# ── Cascade : supprimer les tâches quand le projet est supprimé
# tasks: Mapped[list[Task]] = relationship("Task", cascade="all, delete-orphan")
task.project pour chaque tâche sans joinedload génère N requêtes supplémentaires. Utilisez toujours une stratégie de chargement explicite.
5. CRUD style 2.x
SQLAlchemy 2.x préfère le style session.execute(select(...)) plutôt que l'ancien session.query(Model).
from __future__ import annotations
from sqlalchemy import select
from sqlalchemy.orm import Session
# ── CREATE
def create_user(db: Session, email: str, username: str, hashed_password: str) -> User:
user = User(email=email, username=username, hashed_password=hashed_password)
db.add(user)
db.commit()
db.refresh(user) # recharge l'objet depuis la DB (id, created_at…)
return user
# ── READ (un seul)
def get_user_by_id(db: Session, user_id: int) -> User | None:
return db.get(User, user_id) # raccourci pour get par PK
def get_user_by_email(db: Session, email: str) -> User | None:
stmt = select(User).where(User.email == email)
return db.execute(stmt).scalar_one_or_none()
# ── READ (liste)
def get_users(db: Session, skip: int = 0, limit: int = 100) -> list[User]:
stmt = select(User).offset(skip).limit(limit)
return list(db.execute(stmt).scalars().all())
# ── UPDATE
def update_task_status(db: Session, task_id: int, status: str) -> Task | None:
task = db.get(Task, task_id)
if task is None:
return None
task.status = status # modifier l'attribut
db.commit() # écrire en DB
db.refresh(task)
return task
# ── DELETE
def delete_task(db: Session, task_id: int) -> bool:
task = db.get(Task, task_id)
if task is None:
return False
db.delete(task)
db.commit()
return True
# ── BULK operations (2.x style)
from sqlalchemy import update as sa_update, delete as sa_delete
def archive_done_tasks(db: Session) -> int:
stmt = (
sa_update(Task)
.where(Task.status == "done")
.values(is_archived=True)
.returning(Task.id)
)
result = db.execute(stmt)
db.commit()
return result.rowcount
6. Filtres & jointures
from __future__ import annotations
from sqlalchemy import select, func, and_, or_, desc
from sqlalchemy.orm import Session, joinedload
def get_tasks_by_project(db: Session, project_id: int, status: str | None = None) -> list[Task]:
stmt = (
select(Task)
.where(Task.project_id == project_id)
.options(joinedload(Task.assignee))
.order_by(desc(Task.created_at))
)
if status:
stmt = stmt.where(Task.status == status)
return list(db.execute(stmt).unique().scalars().all())
# ── Filtres complexes avec AND / OR
def search_tasks(db: Session, q: str, statuses: list[str]) -> list[Task]:
stmt = select(Task).where(
and_(
Task.title.ilike(f"%{q}%"), # LIKE insensible à la casse
or_(*[Task.status == s for s in statuses]),
)
)
return list(db.execute(stmt).scalars().all())
# ── Agrégations et statistiques
def get_project_stats(db: Session, project_id: int) -> dict:
stmt = (
select(
func.count(Task.id).label("total"),
func.sum(
(Task.status == "done").cast(Integer)
).label("done_count"),
)
.where(Task.project_id == project_id)
)
row = db.execute(stmt).one()
return {
"total": row.total or 0,
"done": row.done_count or 0,
"completion_rate": (row.done_count or 0) / (row.total or 1) * 100,
}
# ── Jointure explicite
def get_tasks_with_owner_email(db: Session) -> list:
stmt = (
select(Task.id, Task.title, User.email)
.join(Task.assignee) # JOIN sur la relation
.where(Task.status != "cancelled")
)
return [{"id": r.id, "title": r.title, "email": r.email} for r in db.execute(stmt)]
7. Alembic
Alembic gère les migrations de schéma de base de données de manière versionnée, comme git pour votre DB.
# Installation
pip install alembic
# Initialiser Alembic dans le projet
alembic init alembic
# Modifier alembic/env.py pour pointer sur vos modèles
# target_metadata = Base.metadata
# alembic/env.py — configuration minimale
from __future__ import annotations
from models import Base # ← importer la Base SQLAlchemy
from database import DATABASE_URL
config.set_main_option("sqlalchemy.url", DATABASE_URL)
target_metadata = Base.metadata # ← Alembic inspecte les modèles
# Générer une migration automatique (après ajout d'un modèle)
alembic revision --autogenerate -m "create tasks table"
# Appliquer les migrations
alembic upgrade head
# Revenir à la version précédente
alembic downgrade -1
# Voir l'historique
alembic history --verbose
# Voir la version courante
alembic current
# alembic/versions/xxxx_create_tasks_table.py — exemple de migration générée
from __future__ import annotations
from alembic import op
import sqlalchemy as sa
def upgrade() -> None:
op.create_table(
"tasks",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("title", sa.String(200), nullable=False),
sa.Column("status", sa.String(50), nullable=False, server_default="todo"),
sa.Column("project_id", sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(["project_id"], ["projects.id"]),
sa.PrimaryKeyConstraint("id"),
)
op.create_index("ix_tasks_project_id", "tasks", ["project_id"])
def downgrade() -> None:
op.drop_table("tasks")
8. Transactions
Une transaction garantit l'atomicité : soit toutes les opérations réussissent, soit aucune n'est appliquée.
from __future__ import annotations
from sqlalchemy.orm import Session
from sqlalchemy.exc import SQLAlchemyError
# ── Gestion manuelle des transactions
def transfer_task(db: Session, task_id: int, new_project_id: int, new_assignee_id: int) -> bool:
try:
task = db.get(Task, task_id)
if task is None:
return False
# Plusieurs opérations dans une même transaction
task.project_id = new_project_id
task.assignee_id = new_assignee_id
# Créer un commentaire de transfert
comment = Comment(
task_id=task_id,
content=f"Transféré au projet {new_project_id}",
)
db.add(comment)
db.commit() # ← tout ou rien
db.refresh(task)
return True
except SQLAlchemyError as e:
db.rollback() # ← annuler en cas d'erreur
print(f"Erreur transaction : {e}")
return False
# ── Context manager (savepoint)
def safe_bulk_create(db: Session, tasks_data: list[dict]) -> list[Task]:
created = []
for data in tasks_data:
try:
with db.begin_nested(): # savepoint — rollback partiel possible
task = Task(**data)
db.add(task)
db.flush() # écrire sans commit (valider les contraintes)
created.append(task)
except Exception as e:
print(f"Tâche ignorée : {e}")
# Seulement cette tâche est annulée, les autres continuent
db.commit()
return created