Exercices â Module 08
WebSockets, Events & Queues · 5 exercices pratiques
Gateway de notifications
Créez une NotificationsGateway qui permet aux utilisateurs de rejoindre leur room personnelle (user-{userId}) et reçoivent des notifications en temps réel via notify.
Voir la solution
@WebSocketGateway({ cors: { origin: '*' } })
export class NotificationsGateway {
@WebSocketServer() server: Server;
@SubscribeMessage('subscribe')
handleSubscribe(
@ConnectedSocket() client: Socket,
@MessageBody() data: { userId: number; token: string },
) {
// Vérification simplifiée du token
const room = `user-${data.userId}`;
client.join(room);
client.emit('subscribed', { room });
}
// Appelé depuis d'autres services pour envoyer une notification
sendToUser(userId: number, notification: Notification) {
this.server.to(`user-${userId}`).emit('notification', notification);
}
// Broadcast Ă tous
broadcast(event: string, data: any) {
this.server.emit(event, data);
}
}
ĂvĂ©nements avec EventEmitter2
Implémentez un systÚme d'événements pour un e-commerce : émettez order.created, order.paid, order.shipped. Créez des listeners pour envoyer des emails et mettre à jour le stock.
Voir la solution
// order-events.ts
export class OrderCreatedEvent { constructor(public readonly order: Order) {} }
export class OrderPaidEvent { constructor(public readonly order: Order) {} }
export class OrderShippedEvent {
constructor(public readonly order: Order, public readonly trackingNumber: string) {}
}
// orders.service.ts
@Injectable()
export class OrdersService {
constructor(private eventEmitter: EventEmitter2) {}
async create(dto: CreateOrderDto): Promise<Order> {
const order = await this.orderRepo.save(this.orderRepo.create(dto));
this.eventEmitter.emit('order.created', new OrderCreatedEvent(order));
return order;
}
}
// mail.listener.ts
@Injectable()
export class OrderMailListener {
@OnEvent('order.created')
sendConfirmation(event: OrderCreatedEvent) {
console.log(`Sending confirmation email for order #${event.order.id}`);
}
@OnEvent('order.shipped')
sendShippingNotif(event: OrderShippedEvent) {
console.log(`Tracking: ${event.trackingNumber} for order #${event.order.id}`);
}
}
// stock.listener.ts
@Injectable()
export class StockListener {
@OnEvent('order.created')
async decrementStock(event: OrderCreatedEvent) {
for (const item of event.order.items) {
await this.productsService.decrementStock(item.productId, item.quantity);
}
}
}
Queue BullMQ â traitement d'images
Créez une queue image-processing avec BullMQ. Les jobs doivent redimensionner des images en 3 tailles (thumbnail, medium, large). Implémentez le processor avec gestion d'erreurs et retry.
Voir la solution
// image-processing.processor.ts
@Processor('image-processing')
export class ImageProcessor {
private readonly logger = new Logger(ImageProcessor.name);
@Process('resize')
async handleResize(job: Job<{ filePath: string; sizes: number[] }>) {
const { filePath, sizes } = job.data;
this.logger.log(`Processing: ${filePath}`);
const results = [];
for (let i = 0; i < sizes.length; i++) {
const size = sizes[i];
// Simuler le traitement
await new Promise(r => setTimeout(r, 100));
results.push({ size, path: `${filePath}-${size}px.webp` });
await job.progress(Math.round(((i + 1) / sizes.length) * 100));
}
return results;
}
@OnWorkerEvent('failed')
onFailed(job: Job, err: Error) {
this.logger.error(`Job ${job.id} failed (attempt ${job.attemptsMade}): ${err.message}`);
}
}
// upload.service.ts â ajouter un job
async processUpload(filePath: string) {
return this.imageQueue.add('resize', {
filePath,
sizes: [150, 400, 1200],
}, {
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: 10,
removeOnFail: 5,
});
}
Chat en temps réel avec rooms
Implémentez un systÚme de chat complet avec : création de rooms, rejoindre/quitter, envoi de messages, liste des membres connectés. Intégrez l'authentification JWT dans la Gateway.
Voir la solution
@WebSocketGateway({ cors: { origin: '*' }, namespace: '/chat' })
export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer() server: Server;
private connectedUsers = new Map<string, { userId: number; rooms: Set<string> }>();
async handleConnection(client: Socket) {
const token = client.handshake.auth.token;
try {
const payload = this.jwtService.verify(token);
this.connectedUsers.set(client.id, { userId: payload.sub, rooms: new Set() });
} catch {
client.disconnect();
}
}
handleDisconnect(client: Socket) {
const user = this.connectedUsers.get(client.id);
if (user) {
user.rooms.forEach(room => {
this.server.to(room).emit('user:left', { socketId: client.id, userId: user.userId });
});
this.connectedUsers.delete(client.id);
}
}
@SubscribeMessage('room:join')
joinRoom(@ConnectedSocket() client: Socket, @MessageBody() room: string) {
client.join(room);
this.connectedUsers.get(client.id)?.rooms.add(room);
this.server.to(room).emit('user:joined', { socketId: client.id, room });
return { joined: room };
}
@SubscribeMessage('message:send')
sendMessage(
@ConnectedSocket() client: Socket,
@MessageBody() data: { room: string; content: string },
) {
const user = this.connectedUsers.get(client.id);
const message = { content: data.content, userId: user?.userId, timestamp: Date.now() };
this.server.to(data.room).emit('message:new', message);
return message;
}
}
CQRS â systĂšme de commandes
Refactorisez un OrdersModule en CQRS : créez PlaceOrderCommand, CancelOrderCommand, GetOrderQuery, GetUserOrdersQuery avec leurs handlers respectifs.
Voir la solution
// Commands
export class PlaceOrderCommand {
constructor(public readonly dto: CreateOrderDto, public readonly userId: number) {}
}
export class CancelOrderCommand {
constructor(public readonly orderId: number, public readonly userId: number) {}
}
// Queries
export class GetOrderQuery { constructor(public readonly id: number) {} }
export class GetUserOrdersQuery {
constructor(public readonly userId: number, public readonly page: number) {}
}
// Handlers
@CommandHandler(PlaceOrderCommand)
export class PlaceOrderHandler implements ICommandHandler<PlaceOrderCommand> {
async execute(cmd: PlaceOrderCommand): Promise<Order> {
// Validation stock, calcul total, sauvegarde
const order = await this.orderRepo.save({ ...cmd.dto, userId: cmd.userId });
this.eventBus.publish(new OrderPlacedEvent(order));
return order;
}
}
@QueryHandler(GetUserOrdersQuery)
export class GetUserOrdersHandler implements IQueryHandler<GetUserOrdersQuery> {
async execute(query: GetUserOrdersQuery) {
return this.orderRepo.findAndCount({
where: { userId: query.userId },
take: 10, skip: (query.page - 1) * 10,
});
}
}
// Controller
@Post()
create(@Body() dto: CreateOrderDto, @CurrentUser('id') userId: number) {
return this.commandBus.execute(new PlaceOrderCommand(dto, userId));
}