1. field_validator

Les @field_validator permettent de valider et transformer un champ individuel après la validation de type.

from __future__ import annotations

from datetime import datetime
from pydantic import BaseModel, Field, field_validator

class TaskCreate(BaseModel):
    title: str = Field(..., min_length=3, max_length=200)
    due_date: datetime | None = None
    tags: list[str] = Field(default_factory=list)

    # mode="before" : s'exécute AVANT la validation de type
    @field_validator("title", mode="before")
    @classmethod
    def strip_title(cls, v: object) -> str:
        if isinstance(v, str):
            return v.strip()
        return str(v)

    # mode="after" (défaut) : s'exécute APRÈS la validation de type
    @field_validator("title")
    @classmethod
    def title_not_empty(cls, v: str) -> str:
        if not v:
            raise ValueError("Le titre ne peut pas être vide après nettoyage")
        return v.title()  # Title Case

    # Valider plusieurs champs dans un seul validator
    @field_validator("tags", each_item=True)
    @classmethod
    def tag_lowercase(cls, v: str) -> str:
        return v.lower().strip()

    # Validator sur due_date
    @field_validator("due_date")
    @classmethod
    def due_date_must_be_future(cls, v: datetime | None) -> datetime | None:
        if v is not None and v <= datetime.now():
            raise ValueError("La date d'échéance doit être dans le futur")
        return v

# ── Test
task = TaskCreate(title="  ma tâche  ", tags=["PYTHON", "  API  "])
print(task.title)   # "Ma Tâche"
print(task.tags)    # ["python", "api"]

2. model_validator

@model_validator valide l'objet entier, utile pour les règles inter-champs.

from __future__ import annotations

from datetime import datetime
from pydantic import BaseModel, Field, model_validator

class DateRange(BaseModel):
    start_date: datetime
    end_date: datetime
    max_duration_days: int = Field(365, ge=1)

    @model_validator(mode="after")
    def check_date_order(self) -> DateRange:
        if self.end_date <= self.start_date:
            raise ValueError("end_date doit être après start_date")
        duration = (self.end_date - self.start_date).days
        if duration > self.max_duration_days:
            raise ValueError(f"Durée max dépassée : {duration} > {self.max_duration_days} jours")
        return self

class PasswordChange(BaseModel):
    current_password: str
    new_password: str = Field(..., min_length=8)
    confirm_password: str

    @model_validator(mode="after")
    def passwords_match(self) -> PasswordChange:
        if self.new_password != self.confirm_password:
            raise ValueError("Les mots de passe ne correspondent pas")
        if self.new_password == self.current_password:
            raise ValueError("Le nouveau mot de passe doit être différent de l'ancien")
        return self

    @model_validator(mode="before")
    @classmethod
    def check_required_fields(cls, data: dict) -> dict:
        # mode="before" reçoit un dict brut (avant la validation des types)
        if not data.get("current_password"):
            raise ValueError("current_password est obligatoire")
        return data

3. Types personnalisés

from __future__ import annotations

from pydantic import BaseModel, EmailStr, constr, confloat, conint, AnyHttpUrl

# ── Types contraints (Pydantic v2)
class UserCreate(BaseModel):
    email: EmailStr                                          # validation RFC 5322
    username: constr(min_length=3, max_length=50, pattern=r"^[a-zA-Z0-9_]+$")  # type: ignore[valid-type]
    age: conint(ge=13, le=120) | None = None                 # type: ignore[valid-type]
    score: confloat(ge=0.0, le=100.0) | None = None          # type: ignore[valid-type]
    website: AnyHttpUrl | None = None                        # URL valide

# ── Annotated (style moderne Pydantic v2)
from typing import Annotated
from pydantic import AfterValidator, BeforeValidator

def validate_phone(v: str) -> str:
    """Normalise un numéro de téléphone."""
    cleaned = "".join(c for c in v if c.isdigit() or c == "+")
    if not cleaned.startswith("+") or len(cleaned) < 10:
        raise ValueError("Numéro de téléphone invalide")
    return cleaned

PhoneNumber = Annotated[str, AfterValidator(validate_phone)]
TrimmedStr = Annotated[str, BeforeValidator(str.strip)]

class ContactCreate(BaseModel):
    name: TrimmedStr
    phone: PhoneNumber
    email: EmailStr

contact = ContactCreate(name="  Alice  ", phone="+33612345678", email="alice@test.com")
print(contact.name)   # "Alice" (stripped)

4. model_dump & sérialisation

from __future__ import annotations

from datetime import datetime
from pydantic import BaseModel, Field

class TaskResponse(BaseModel):
    id: int
    title: str
    status: str
    created_at: datetime
    _internal_score: float = 0.0   # champ privé (non sérialisé)

task = TaskResponse(id=1, title="Test", status="todo", created_at=datetime.now())

# ── model_dump : dict Python
print(task.model_dump())
# → {"id": 1, "title": "Test", "status": "todo", "created_at": datetime(...)}

# Exclure des champs
print(task.model_dump(exclude={"created_at"}))

# N'inclure que les champs fournis (pas les defaults)
class TaskUpdate(BaseModel):
    title: str | None = None
    status: str | None = None

update = TaskUpdate(status="done")
print(update.model_dump(exclude_unset=True))
# → {"status": "done"}  (title absent car non fourni)

# ── model_dump_json : JSON string
import json
print(task.model_dump_json())

# ── model_dump avec mode="json" (sérialise datetime en ISO string)
data = task.model_dump(mode="json")
print(type(data["created_at"]))   # str (ISO 8601)

# ── model_validate : reconstruire depuis un dict/ORM
from sqlalchemy.orm import DeclarativeBase
task2 = TaskResponse.model_validate({"id": 2, "title": "Test 2", "status": "done", "created_at": datetime.now()})

5. Génériques PaginatedResponse[T]

from __future__ import annotations

import math
from typing import Generic, TypeVar

from pydantic import BaseModel

T = TypeVar("T")

class PaginatedResponse(BaseModel, Generic[T]):
    """Réponse paginée générique — compatible avec n'importe quel schema."""
    items: list[T]
    total: int
    page: int
    limit: int
    pages: int
    has_next: bool
    has_prev: bool

    @classmethod
    def create(
        cls,
        items: list[T],
        total: int,
        page: int,
        limit: int,
    ) -> "PaginatedResponse[T]":
        pages = math.ceil(total / limit) if total > 0 else 1
        return cls(
            items=items, total=total, page=page, limit=limit,
            pages=pages, has_next=page < pages, has_prev=page > 1,
        )

# ── Usage avec FastAPI
from pydantic import BaseModel as PM

class TaskResponse(PM):
    id: int
    title: str

class UserResponse(PM):
    id: int
    email: str

# response_model=PaginatedResponse[TaskResponse]
# response_model=PaginatedResponse[UserResponse]

# ── Test
tasks = [TaskResponse(id=i, title=f"Task {i}") for i in range(1, 6)]
page = PaginatedResponse[TaskResponse].create(tasks[:2], total=5, page=1, limit=2)
print(page.model_dump_json(indent=2))

6. OWASP Top 10 — Injection, IDOR, XSS

from __future__ import annotations

# ── A03 Injection SQL → ORM ou paramètres liés
from sqlalchemy import select, text
from sqlalchemy.orm import Session

# ❌ DANGEREUX — injection SQL possible
def get_user_bad(db: Session, email: str):
    return db.execute(text(f"SELECT * FROM users WHERE email = '{email}'")).fetchone()

# ✅ SÉCURISÉ — paramètre lié (échappé automatiquement)
def get_user_safe(db: Session, email: str):
    return db.execute(text("SELECT * FROM users WHERE email = :email"), {"email": email}).fetchone()

# ✅ ENCORE MIEUX — ORM (pas de SQL brut)
def get_user_orm(db: Session, email: str):
    return db.execute(select(User).where(User.email == email)).scalar_one_or_none()

# ── A01 Contrôle d'accès défaillant — IDOR
# ❌ DANGEREUX — n'importe qui peut accéder à n'importe quel projet
# GET /api/projects/42  → retourne le projet 42 sans vérifier l'ownership

# ✅ SÉCURISÉ — toujours filtrer par l'utilisateur courant
def get_project_safe(db: Session, project_id: int, current_user_id: int):
    project = db.get(Project, project_id)
    if project is None:
        raise HTTPException(404, "Projet introuvable")
    if project.owner_id != current_user_id:
        raise HTTPException(403, "Accès refusé")  # Ne pas lever 404 ici (info-leak)
    return project

# ── A03 XSS → échapper les sorties HTML
import html

def render_safe(user_input: str) -> str:
    return html.escape(user_input)   # <script> → &lt;script&gt;

# En FastAPI/JSON : pas de risque XSS direct (JSON n'est pas du HTML)
# Attention si vous générez du HTML côté serveur avec Jinja2

7. BaseSettings

BaseSettings de pydantic-settings lit automatiquement les variables d'environnement et le fichier .env.

from __future__ import annotations

from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict

class Settings(BaseSettings):
    # Variables d'environnement (priorité : env > .env > default)
    app_name: str = "TaskAPI"
    app_version: str = "1.0.0"
    debug: bool = False

    # Secrets — lever une erreur si absent en production
    secret_key: str = Field(..., description="JWT secret key")
    database_url: str = Field("sqlite:///./taskapi.db")

    # Optionnels
    redis_url: str | None = None
    allowed_hosts: list[str] = ["localhost", "127.0.0.1"]
    max_request_size_mb: int = 10
    log_level: str = "INFO"

    model_config = SettingsConfigDict(
        env_file=".env",          # charger depuis .env
        env_file_encoding="utf-8",
        case_sensitive=False,     # DATABASE_URL = database_url
        extra="ignore",           # ignorer les vars inconnues
    )

# ── Singleton (chargé une seule fois)
from functools import lru_cache

@lru_cache
def get_settings() -> Settings:
    return Settings()

# ── Usage dans FastAPI
from fastapi import Depends

def get_db_url(settings: Settings = Depends(get_settings)) -> str:
    return settings.database_url

8. Headers de sécurité

from __future__ import annotations

from fastapi import FastAPI, Request, Response
from fastapi.middleware.base import BaseHTTPMiddleware

class SecurityHeadersMiddleware(BaseHTTPMiddleware):
    """Ajoute les headers de sécurité recommandés par OWASP."""
    async def dispatch(self, request: Request, call_next) -> Response:
        response = await call_next(request)

        # Empêche le clickjacking
        response.headers["X-Frame-Options"] = "DENY"
        # Empêche le MIME sniffing
        response.headers["X-Content-Type-Options"] = "nosniff"
        # Active la protection XSS du navigateur
        response.headers["X-XSS-Protection"] = "1; mode=block"
        # Force HTTPS pendant 1 an
        response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
        # Content Security Policy
        response.headers["Content-Security-Policy"] = "default-src 'self'"
        # Masque le serveur
        response.headers["Server"] = "TaskAPI"
        # Contrôle les informations de référent
        response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"

        return response

app = FastAPI()
app.add_middleware(SecurityHeadersMiddleware)

9. Tests pytest

from __future__ import annotations

import pytest
from fastapi.testclient import TestClient
from pydantic import ValidationError

# ── Test unitaire d'un schéma Pydantic
def test_task_create_valid():
    from schemas import TaskCreate
    task = TaskCreate(title="Apprendre pytest", project_id=1)
    assert task.title == "Apprendre Pytest"   # Title Case appliqué

def test_task_create_invalid_title():
    from schemas import TaskCreate
    with pytest.raises(ValidationError) as exc_info:
        TaskCreate(title="Ab", project_id=1)   # trop court
    errors = exc_info.value.errors()
    assert any("title" in str(e["loc"]) for e in errors)

# ── Test d'intégration avec TestClient
from main import app

@pytest.fixture
def client():
    return TestClient(app)

def test_register_success(client):
    resp = client.post("/auth/register", json={
        "email": "test@test.com",
        "username": "testuser",
        "password": "TestPass123",
    })
    assert resp.status_code == 201
    assert "id" in resp.json()
    assert "password" not in resp.json()   # hashed_password jamais exposé

def test_register_weak_password(client):
    resp = client.post("/auth/register", json={
        "email": "weak@test.com",
        "username": "weak",
        "password": "123",   # trop court (< 8 chars)
    })
    assert resp.status_code == 422   # Unprocessable Entity

def test_protected_route_without_token(client):
    resp = client.get("/api/me")
    assert resp.status_code == 401

def test_full_auth_flow(client):
    # Register
    client.post("/auth/register", json={"email": "flow@test.com", "username": "flow", "password": "FlowPass123"})
    # Login
    resp = client.post("/auth/login", data={"username": "flow@test.com", "password": "FlowPass123"})
    assert resp.status_code == 200
    token = resp.json()["access_token"]
    # Protected route
    me = client.get("/api/me", headers={"Authorization": f"Bearer {token}"})
    assert me.status_code == 200
    assert me.json()["email"] == "flow@test.com"
← Module 05 Module 07 — Django →