1. ORM vs SQL pur

SQLAlchemy propose deux approches complémentaires : l'ORM (Object-Relational Mapping) qui travaille avec des objets Python, et Core qui travaille avec des expressions SQL constructibles.

CritèreORMSQL Core / pur
Niveau d'abstractionHaut — objets PythonBas — expressions SQL
LisibilitéTrès lisiblePlus verbeux
PerformanceOverhead faibleOptimal
ComplexitéSimple pour le CRUDPuissant pour requêtes complexes
MigrationAlembic intégréManuel ou Alembic
Recommandé pourApplications CRUDReporting, analytics
from __future__ import annotations

# ── ORM : travailler avec des objets
from sqlalchemy.orm import Session

def get_user_orm(db: Session, user_id: int):
    return db.get(User, user_id)   # User = classe Python

# ── Core : construire du SQL programmatiquement
from sqlalchemy import select, text

def get_user_core(db: Session, user_id: int):
    stmt = select(User).where(User.id == user_id)
    return db.execute(stmt).scalar_one_or_none()

# ── SQL brut (quand vraiment nécessaire)
def count_tasks_raw(db: Session) -> int:
    result = db.execute(text("SELECT COUNT(*) FROM tasks"))
    return result.scalar()

2. Engine & Session

L'Engine gère le pool de connexions. La Session est l'unité de travail — elle suit les changements et les écrit en DB lors du commit.

from __future__ import annotations

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase

# ── Création de l'engine (SQLite pour le dev, PostgreSQL en prod)
DATABASE_URL = "sqlite:///./taskapi.db"
# DATABASE_URL = "postgresql+psycopg2://user:pass@localhost/taskapi"

engine = create_engine(
    DATABASE_URL,
    # SQLite seulement : autorise l'accès multi-thread
    connect_args={"check_same_thread": False},
    # Pool de connexions (PostgreSQL)
    # pool_size=5, max_overflow=10
    echo=False,   # True = affiche les requêtes SQL en console
)

# ── Factory de sessions
SessionLocal = sessionmaker(
    bind=engine,
    autocommit=False,   # ← toujours False : gérer les commits manuellement
    autoflush=False,    # ← ne pas écrire avant le commit
)

# ── Base déclarative (partagée par tous les modèles)
class Base(DeclarativeBase):
    pass

# ── Dépendance FastAPI : fournit une session par requête
def get_db():
    db = SessionLocal()
    try:
        yield db          # la session est disponible dans la route
    finally:
        db.close()        # toujours fermée, même en cas d'erreur

# ── Créer les tables (dev uniquement, utiliser Alembic en prod)
# Base.metadata.create_all(bind=engine)

3. Modèles déclaratifs (style 2.x)

SQLAlchemy 2.x introduit les Mapped annotations pour un typage statique fort et une meilleure intégration avec mypy/pyright.

from __future__ import annotations

from datetime import datetime
from typing import Optional

from sqlalchemy import String, Text, ForeignKey, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"

    # Mapped[type] = colonne avec annotation de type
    id: Mapped[int] = mapped_column(primary_key=True, index=True)
    email: Mapped[str] = mapped_column(String(255), unique=True, index=True, nullable=False)
    username: Mapped[str] = mapped_column(String(50), unique=True, nullable=False)
    hashed_password: Mapped[str] = mapped_column(String(255), nullable=False)
    is_active: Mapped[bool] = mapped_column(default=True)
    created_at: Mapped[datetime] = mapped_column(default=func.now())

    # Relation inverse (back_populates)
    projects: Mapped[list[Project]] = relationship("Project", back_populates="owner")
    tasks: Mapped[list[Task]] = relationship("Task", back_populates="assignee")

    def __repr__(self) -> str:
        return f"<User id={self.id} email={self.email!r}>"

class Project(Base):
    __tablename__ = "projects"

    id: Mapped[int] = mapped_column(primary_key=True, index=True)
    name: Mapped[str] = mapped_column(String(200), nullable=False)
    description: Mapped[Optional[str]] = mapped_column(Text)
    owner_id: Mapped[int] = mapped_column(ForeignKey("users.id"), nullable=False)
    created_at: Mapped[datetime] = mapped_column(default=func.now())

    owner: Mapped[User] = relationship("User", back_populates="projects")
    tasks: Mapped[list[Task]] = relationship("Task", back_populates="project", cascade="all, delete-orphan")

4. Relations

SQLAlchemy gère automatiquement les JOIN via relationship(). Choisissez la stratégie de chargement selon vos besoins.

from __future__ import annotations

from sqlalchemy import Column, ForeignKey, Integer, String, Table
from sqlalchemy.orm import relationship, selectinload, joinedload

# ── Many-to-Many : table d'association
task_tags = Table(
    "task_tags",
    Base.metadata,
    Column("task_id", Integer, ForeignKey("tasks.id"), primary_key=True),
    Column("tag_id", Integer, ForeignKey("tags.id"), primary_key=True),
)

class Task(Base):
    __tablename__ = "tasks"

    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    project_id: Mapped[int] = mapped_column(ForeignKey("projects.id"))
    assignee_id: Mapped[int | None] = mapped_column(ForeignKey("users.id"))

    # Lazy loading (défaut) — chargé à la première utilisation
    project: Mapped[Project] = relationship("Project", back_populates="tasks")
    assignee: Mapped[User | None] = relationship("User", back_populates="tasks")

    # Many-to-Many via table d'association
    tags: Mapped[list[Tag]] = relationship("Tag", secondary=task_tags, back_populates="tasks")

# ── Stratégies de chargement (dans les queries)
from sqlalchemy.orm import Session

def get_tasks_with_project(db: Session):
    # joinedload : JOIN SQL unique — idéal pour n-à-1
    return db.execute(
        select(Task).options(joinedload(Task.project))
    ).unique().scalars().all()

def get_tasks_with_tags(db: Session):
    # selectinload : SELECT séparé — idéal pour 1-à-n et n-à-n
    return db.execute(
        select(Task).options(selectinload(Task.tags))
    ).unique().scalars().all()

# ── Cascade : supprimer les tâches quand le projet est supprimé
# tasks: Mapped[list[Task]] = relationship("Task", cascade="all, delete-orphan")
N+1 Problem : Accéder à task.project pour chaque tâche sans joinedload génère N requêtes supplémentaires. Utilisez toujours une stratégie de chargement explicite.

5. CRUD style 2.x

SQLAlchemy 2.x préfère le style session.execute(select(...)) plutôt que l'ancien session.query(Model).

from __future__ import annotations

from sqlalchemy import select
from sqlalchemy.orm import Session

# ── CREATE
def create_user(db: Session, email: str, username: str, hashed_password: str) -> User:
    user = User(email=email, username=username, hashed_password=hashed_password)
    db.add(user)
    db.commit()
    db.refresh(user)   # recharge l'objet depuis la DB (id, created_at…)
    return user

# ── READ (un seul)
def get_user_by_id(db: Session, user_id: int) -> User | None:
    return db.get(User, user_id)   # raccourci pour get par PK

def get_user_by_email(db: Session, email: str) -> User | None:
    stmt = select(User).where(User.email == email)
    return db.execute(stmt).scalar_one_or_none()

# ── READ (liste)
def get_users(db: Session, skip: int = 0, limit: int = 100) -> list[User]:
    stmt = select(User).offset(skip).limit(limit)
    return list(db.execute(stmt).scalars().all())

# ── UPDATE
def update_task_status(db: Session, task_id: int, status: str) -> Task | None:
    task = db.get(Task, task_id)
    if task is None:
        return None
    task.status = status      # modifier l'attribut
    db.commit()               # écrire en DB
    db.refresh(task)
    return task

# ── DELETE
def delete_task(db: Session, task_id: int) -> bool:
    task = db.get(Task, task_id)
    if task is None:
        return False
    db.delete(task)
    db.commit()
    return True

# ── BULK operations (2.x style)
from sqlalchemy import update as sa_update, delete as sa_delete

def archive_done_tasks(db: Session) -> int:
    stmt = (
        sa_update(Task)
        .where(Task.status == "done")
        .values(is_archived=True)
        .returning(Task.id)
    )
    result = db.execute(stmt)
    db.commit()
    return result.rowcount

6. Filtres & jointures

from __future__ import annotations

from sqlalchemy import select, func, and_, or_, desc
from sqlalchemy.orm import Session, joinedload

def get_tasks_by_project(db: Session, project_id: int, status: str | None = None) -> list[Task]:
    stmt = (
        select(Task)
        .where(Task.project_id == project_id)
        .options(joinedload(Task.assignee))
        .order_by(desc(Task.created_at))
    )
    if status:
        stmt = stmt.where(Task.status == status)
    return list(db.execute(stmt).unique().scalars().all())

# ── Filtres complexes avec AND / OR
def search_tasks(db: Session, q: str, statuses: list[str]) -> list[Task]:
    stmt = select(Task).where(
        and_(
            Task.title.ilike(f"%{q}%"),   # LIKE insensible à la casse
            or_(*[Task.status == s for s in statuses]),
        )
    )
    return list(db.execute(stmt).scalars().all())

# ── Agrégations et statistiques
def get_project_stats(db: Session, project_id: int) -> dict:
    stmt = (
        select(
            func.count(Task.id).label("total"),
            func.sum(
                (Task.status == "done").cast(Integer)
            ).label("done_count"),
        )
        .where(Task.project_id == project_id)
    )
    row = db.execute(stmt).one()
    return {
        "total": row.total or 0,
        "done": row.done_count or 0,
        "completion_rate": (row.done_count or 0) / (row.total or 1) * 100,
    }

# ── Jointure explicite
def get_tasks_with_owner_email(db: Session) -> list:
    stmt = (
        select(Task.id, Task.title, User.email)
        .join(Task.assignee)      # JOIN sur la relation
        .where(Task.status != "cancelled")
    )
    return [{"id": r.id, "title": r.title, "email": r.email} for r in db.execute(stmt)]

7. Alembic

Alembic gère les migrations de schéma de base de données de manière versionnée, comme git pour votre DB.

# Installation
pip install alembic

# Initialiser Alembic dans le projet
alembic init alembic

# Modifier alembic/env.py pour pointer sur vos modèles
# target_metadata = Base.metadata
# alembic/env.py — configuration minimale
from __future__ import annotations

from models import Base           # ← importer la Base SQLAlchemy
from database import DATABASE_URL

config.set_main_option("sqlalchemy.url", DATABASE_URL)
target_metadata = Base.metadata   # ← Alembic inspecte les modèles
# Générer une migration automatique (après ajout d'un modèle)
alembic revision --autogenerate -m "create tasks table"

# Appliquer les migrations
alembic upgrade head

# Revenir à la version précédente
alembic downgrade -1

# Voir l'historique
alembic history --verbose

# Voir la version courante
alembic current
# alembic/versions/xxxx_create_tasks_table.py — exemple de migration générée
from __future__ import annotations

from alembic import op
import sqlalchemy as sa

def upgrade() -> None:
    op.create_table(
        "tasks",
        sa.Column("id", sa.Integer(), nullable=False),
        sa.Column("title", sa.String(200), nullable=False),
        sa.Column("status", sa.String(50), nullable=False, server_default="todo"),
        sa.Column("project_id", sa.Integer(), nullable=False),
        sa.ForeignKeyConstraint(["project_id"], ["projects.id"]),
        sa.PrimaryKeyConstraint("id"),
    )
    op.create_index("ix_tasks_project_id", "tasks", ["project_id"])

def downgrade() -> None:
    op.drop_table("tasks")

8. Transactions

Une transaction garantit l'atomicité : soit toutes les opérations réussissent, soit aucune n'est appliquée.

from __future__ import annotations

from sqlalchemy.orm import Session
from sqlalchemy.exc import SQLAlchemyError

# ── Gestion manuelle des transactions
def transfer_task(db: Session, task_id: int, new_project_id: int, new_assignee_id: int) -> bool:
    try:
        task = db.get(Task, task_id)
        if task is None:
            return False

        # Plusieurs opérations dans une même transaction
        task.project_id = new_project_id
        task.assignee_id = new_assignee_id

        # Créer un commentaire de transfert
        comment = Comment(
            task_id=task_id,
            content=f"Transféré au projet {new_project_id}",
        )
        db.add(comment)

        db.commit()      # ← tout ou rien
        db.refresh(task)
        return True

    except SQLAlchemyError as e:
        db.rollback()    # ← annuler en cas d'erreur
        print(f"Erreur transaction : {e}")
        return False

# ── Context manager (savepoint)
def safe_bulk_create(db: Session, tasks_data: list[dict]) -> list[Task]:
    created = []
    for data in tasks_data:
        try:
            with db.begin_nested():   # savepoint — rollback partiel possible
                task = Task(**data)
                db.add(task)
                db.flush()            # écrire sans commit (valider les contraintes)
                created.append(task)
        except Exception as e:
            print(f"Tâche ignorée : {e}")
            # Seulement cette tâche est annulée, les autres continuent
    db.commit()
    return created
← Module 03 Module 05 — API REST & Auth →