← Formation BDD

🔍 BD05 — MongoDB Avancé & Agrégations

MongoDB 7 Durée : ~2h30 Niveau : intermédiaire

1. Pipeline d'agrégation

Le pipeline transforme les documents en étapes successives. Chaque étape reçoit le résultat de la précédente.

// Structure générale
db.collection.aggregate([
  { $match:   { ... } },   // Filtre (comme find)
  { $group:   { ... } },   // Groupement
  { $sort:    { ... } },   // Tri
  { $project: { ... } },   // Projection / transformation
  { $limit:   N          },
  { $skip:    N          },
  { $lookup:  { ... } },   // JOIN
  { $unwind:  '...'      } // Décompose un tableau
]);

2. Étapes principales

$match et $group

// CA par catégorie pour le mois courant
const result = await db.collection('orders').aggregate([
  {
    $match: {
      status: 'completed',
      createdAt: { $gte: new Date('2024-01-01'), $lt: new Date('2024-02-01') }
    }
  },
  { $unwind: '$items' },
  {
    $group: {
      _id: '$items.category',
      totalRevenue: { $sum: { $multiply: ['$items.price', '$items.qty'] } },
      totalOrders:  { $sum: 1 },
      avgOrderValue: { $avg: '$total' }
    }
  },
  { $sort: { totalRevenue: -1 } },
  { $limit: 10 }
]).toArray();

$project et $addFields

await db.collection('products').aggregate([
  {
    $addFields: {
      discountedPrice: { $multiply: ['$price', 0.9] },
      fullName: { $concat: ['$brand', ' - ', '$name'] },
      isExpensive: { $gt: ['$price', 100] }
    }
  },
  {
    $project: {
      name: 1, price: 1, discountedPrice: 1, fullName: 1, isExpensive: 1
    }
  }
]).toArray();

$bucket et $bucketAuto

// Distribution des prix
await db.collection('products').aggregate([
  {
    $bucket: {
      groupBy: '$price',
      boundaries: [0, 25, 50, 100, 200, 500, Infinity],
      default: 'Other',
      output: {
        count: { $sum: 1 },
        avgPrice: { $avg: '$price' }
      }
    }
  }
]).toArray();

3. $lookup — Jointures

// JOIN simple (équivalent SQL LEFT JOIN)
await db.collection('orders').aggregate([
  {
    $lookup: {
      from: 'users',
      localField: 'userId',
      foreignField: '_id',
      as: 'user'
    }
  },
  { $unwind: { path: '$user', preserveNullAndEmpty: true } },
  {
    $project: {
      orderId: '$_id',
      userEmail: '$user.email',
      total: 1,
      status: 1
    }
  }
]).toArray();

// $lookup avec pipeline (jointure conditionnelle avancée)
await db.collection('products').aggregate([
  {
    $lookup: {
      from: 'reviews',
      let: { productId: '$_id', minScore: 4 },
      pipeline: [
        { $match: { $expr: { $and: [
          { $eq: ['$productId', '$$productId'] },
          { $gte: ['$score', '$$minScore'] }
        ]}}}
      ],
      as: 'goodReviews'
    }
  },
  { $addFields: { goodReviewCount: { $size: '$goodReviews' } } }
]).toArray();

4. Transactions multi-documents

Disponibles depuis MongoDB 4.0+ sur replica set (ou Mongos pour sharded cluster).

const { MongoClient } = require('mongodb');
const client = new MongoClient(process.env.MONGODB_URI);
await client.connect();

const session = client.startSession();
try {
  session.startTransaction({
    readConcern: { level: 'snapshot' },
    writeConcern: { w: 'majority' }
  });

  const orders = client.db('shop').collection('orders');
  const inventory = client.db('shop').collection('inventory');

  const order = await orders.insertOne(
    { userId: 'u1', total: 99.99, status: 'pending', createdAt: new Date() },
    { session }
  );

  const stock = await inventory.findOne({ productId: 'p1' }, { session });
  if (!stock || stock.qty < 1) throw new Error('Stock insuffisant');

  await inventory.updateOne(
    { productId: 'p1' },
    { $inc: { qty: -1 } },
    { session }
  );

  await orders.updateOne(
    { _id: order.insertedId },
    { $set: { status: 'confirmed' } },
    { session }
  );

  await session.commitTransaction();
  console.log('Transaction réussie');
} catch (err) {
  await session.abortTransaction();
  throw err;
} finally {
  await session.endSession();
}

5. Change Streams

Les Change Streams permettent d'écouter les modifications en temps réel sur une collection, une base ou tout le cluster.

// Écouter les insertions et mises à jour
const collection = db.collection('orders');
const changeStream = collection.watch(
  [{ $match: { operationType: { $in: ['insert', 'update'] } } }],
  { fullDocument: 'updateLookup' }
);

changeStream.on('change', (change) => {
  console.log('Changement détecté :', change.operationType);
  if (change.operationType === 'insert') {
    // Notifier via WebSocket, Redis Pub/Sub, etc.
    notifyNewOrder(change.fullDocument);
  }
  if (change.operationType === 'update') {
    const { updatedFields } = change.updateDescription;
    if (updatedFields.status === 'shipped') {
      sendShippingEmail(change.fullDocument);
    }
  }
});

changeStream.on('error', (err) => { /* gestion erreur / reconnexion */ });

// Fermer proprement
process.on('SIGINT', () => changeStream.close());

6. Atlas Search (Full-Text)

// Nécessite MongoDB Atlas avec index de recherche configuré
const results = await db.collection('articles').aggregate([
  {
    $search: {
      index: 'default',
      text: {
        query: 'postgresql performance',
        path: ['title', 'body'],
        fuzzy: { maxEdits: 1 }
      }
    }
  },
  {
    $project: {
      title: 1, summary: 1,
      score: { $meta: 'searchScore' }
    }
  },
  { $sort: { score: -1 } },
  { $limit: 10 }
]).toArray();