🔍 BD05 — MongoDB Avancé & Agrégations
1. Pipeline d'agrégation
Le pipeline transforme les documents en étapes successives. Chaque étape reçoit le résultat de la précédente.
// Structure générale
db.collection.aggregate([
{ $match: { ... } }, // Filtre (comme find)
{ $group: { ... } }, // Groupement
{ $sort: { ... } }, // Tri
{ $project: { ... } }, // Projection / transformation
{ $limit: N },
{ $skip: N },
{ $lookup: { ... } }, // JOIN
{ $unwind: '...' } // Décompose un tableau
]);
2. Étapes principales
$match et $group
// CA par catégorie pour le mois courant
const result = await db.collection('orders').aggregate([
{
$match: {
status: 'completed',
createdAt: { $gte: new Date('2024-01-01'), $lt: new Date('2024-02-01') }
}
},
{ $unwind: '$items' },
{
$group: {
_id: '$items.category',
totalRevenue: { $sum: { $multiply: ['$items.price', '$items.qty'] } },
totalOrders: { $sum: 1 },
avgOrderValue: { $avg: '$total' }
}
},
{ $sort: { totalRevenue: -1 } },
{ $limit: 10 }
]).toArray();
$project et $addFields
await db.collection('products').aggregate([
{
$addFields: {
discountedPrice: { $multiply: ['$price', 0.9] },
fullName: { $concat: ['$brand', ' - ', '$name'] },
isExpensive: { $gt: ['$price', 100] }
}
},
{
$project: {
name: 1, price: 1, discountedPrice: 1, fullName: 1, isExpensive: 1
}
}
]).toArray();
$bucket et $bucketAuto
// Distribution des prix
await db.collection('products').aggregate([
{
$bucket: {
groupBy: '$price',
boundaries: [0, 25, 50, 100, 200, 500, Infinity],
default: 'Other',
output: {
count: { $sum: 1 },
avgPrice: { $avg: '$price' }
}
}
}
]).toArray();
3. $lookup — Jointures
// JOIN simple (équivalent SQL LEFT JOIN)
await db.collection('orders').aggregate([
{
$lookup: {
from: 'users',
localField: 'userId',
foreignField: '_id',
as: 'user'
}
},
{ $unwind: { path: '$user', preserveNullAndEmpty: true } },
{
$project: {
orderId: '$_id',
userEmail: '$user.email',
total: 1,
status: 1
}
}
]).toArray();
// $lookup avec pipeline (jointure conditionnelle avancée)
await db.collection('products').aggregate([
{
$lookup: {
from: 'reviews',
let: { productId: '$_id', minScore: 4 },
pipeline: [
{ $match: { $expr: { $and: [
{ $eq: ['$productId', '$$productId'] },
{ $gte: ['$score', '$$minScore'] }
]}}}
],
as: 'goodReviews'
}
},
{ $addFields: { goodReviewCount: { $size: '$goodReviews' } } }
]).toArray();
4. Transactions multi-documents
Disponibles depuis MongoDB 4.0+ sur replica set (ou Mongos pour sharded cluster).
const { MongoClient } = require('mongodb');
const client = new MongoClient(process.env.MONGODB_URI);
await client.connect();
const session = client.startSession();
try {
session.startTransaction({
readConcern: { level: 'snapshot' },
writeConcern: { w: 'majority' }
});
const orders = client.db('shop').collection('orders');
const inventory = client.db('shop').collection('inventory');
const order = await orders.insertOne(
{ userId: 'u1', total: 99.99, status: 'pending', createdAt: new Date() },
{ session }
);
const stock = await inventory.findOne({ productId: 'p1' }, { session });
if (!stock || stock.qty < 1) throw new Error('Stock insuffisant');
await inventory.updateOne(
{ productId: 'p1' },
{ $inc: { qty: -1 } },
{ session }
);
await orders.updateOne(
{ _id: order.insertedId },
{ $set: { status: 'confirmed' } },
{ session }
);
await session.commitTransaction();
console.log('Transaction réussie');
} catch (err) {
await session.abortTransaction();
throw err;
} finally {
await session.endSession();
}
5. Change Streams
Les Change Streams permettent d'écouter les modifications en temps réel sur une collection, une base ou tout le cluster.
// Écouter les insertions et mises à jour
const collection = db.collection('orders');
const changeStream = collection.watch(
[{ $match: { operationType: { $in: ['insert', 'update'] } } }],
{ fullDocument: 'updateLookup' }
);
changeStream.on('change', (change) => {
console.log('Changement détecté :', change.operationType);
if (change.operationType === 'insert') {
// Notifier via WebSocket, Redis Pub/Sub, etc.
notifyNewOrder(change.fullDocument);
}
if (change.operationType === 'update') {
const { updatedFields } = change.updateDescription;
if (updatedFields.status === 'shipped') {
sendShippingEmail(change.fullDocument);
}
}
});
changeStream.on('error', (err) => { /* gestion erreur / reconnexion */ });
// Fermer proprement
process.on('SIGINT', () => changeStream.close());
6. Atlas Search (Full-Text)
// Nécessite MongoDB Atlas avec index de recherche configuré
const results = await db.collection('articles').aggregate([
{
$search: {
index: 'default',
text: {
query: 'postgresql performance',
path: ['title', 'body'],
fuzzy: { maxEdits: 1 }
}
}
},
{
$project: {
title: 1, summary: 1,
score: { $meta: 'searchScore' }
}
},
{ $sort: { score: -1 } },
{ $limit: 10 }
]).toArray();