Exercices — Module 08

WebSockets, Events & Queues · 5 exercices pratiques

Ex 01 ⭐ Facile

Gateway de notifications

Créez une NotificationsGateway qui permet aux utilisateurs de rejoindre leur room personnelle (user-{userId}) et reçoivent des notifications en temps réel via notify.

Voir la solution
@WebSocketGateway({ cors: { origin: '*' } })
export class NotificationsGateway {
  @WebSocketServer() server: Server;

  @SubscribeMessage('subscribe')
  handleSubscribe(
    @ConnectedSocket() client: Socket,
    @MessageBody() data: { userId: number; token: string },
  ) {
    // Vérification simplifiée du token
    const room = `user-${data.userId}`;
    client.join(room);
    client.emit('subscribed', { room });
  }

  // Appelé depuis d'autres services pour envoyer une notification
  sendToUser(userId: number, notification: Notification) {
    this.server.to(`user-${userId}`).emit('notification', notification);
  }

  // Broadcast Ă  tous
  broadcast(event: string, data: any) {
    this.server.emit(event, data);
  }
}
Ex 02 ⭐ Facile

ÉvĂ©nements avec EventEmitter2

Implémentez un systÚme d'événements pour un e-commerce : émettez order.created, order.paid, order.shipped. Créez des listeners pour envoyer des emails et mettre à jour le stock.

Voir la solution
// order-events.ts
export class OrderCreatedEvent { constructor(public readonly order: Order) {} }
export class OrderPaidEvent { constructor(public readonly order: Order) {} }
export class OrderShippedEvent {
  constructor(public readonly order: Order, public readonly trackingNumber: string) {}
}

// orders.service.ts
@Injectable()
export class OrdersService {
  constructor(private eventEmitter: EventEmitter2) {}

  async create(dto: CreateOrderDto): Promise<Order> {
    const order = await this.orderRepo.save(this.orderRepo.create(dto));
    this.eventEmitter.emit('order.created', new OrderCreatedEvent(order));
    return order;
  }
}

// mail.listener.ts
@Injectable()
export class OrderMailListener {
  @OnEvent('order.created')
  sendConfirmation(event: OrderCreatedEvent) {
    console.log(`Sending confirmation email for order #${event.order.id}`);
  }

  @OnEvent('order.shipped')
  sendShippingNotif(event: OrderShippedEvent) {
    console.log(`Tracking: ${event.trackingNumber} for order #${event.order.id}`);
  }
}

// stock.listener.ts
@Injectable()
export class StockListener {
  @OnEvent('order.created')
  async decrementStock(event: OrderCreatedEvent) {
    for (const item of event.order.items) {
      await this.productsService.decrementStock(item.productId, item.quantity);
    }
  }
}
Ex 03 ⭐⭐ Moyen

Queue BullMQ — traitement d'images

Créez une queue image-processing avec BullMQ. Les jobs doivent redimensionner des images en 3 tailles (thumbnail, medium, large). Implémentez le processor avec gestion d'erreurs et retry.

Voir la solution
// image-processing.processor.ts
@Processor('image-processing')
export class ImageProcessor {
  private readonly logger = new Logger(ImageProcessor.name);

  @Process('resize')
  async handleResize(job: Job<{ filePath: string; sizes: number[] }>) {
    const { filePath, sizes } = job.data;
    this.logger.log(`Processing: ${filePath}`);

    const results = [];
    for (let i = 0; i < sizes.length; i++) {
      const size = sizes[i];
      // Simuler le traitement
      await new Promise(r => setTimeout(r, 100));
      results.push({ size, path: `${filePath}-${size}px.webp` });
      await job.progress(Math.round(((i + 1) / sizes.length) * 100));
    }
    return results;
  }

  @OnWorkerEvent('failed')
  onFailed(job: Job, err: Error) {
    this.logger.error(`Job ${job.id} failed (attempt ${job.attemptsMade}): ${err.message}`);
  }
}

// upload.service.ts — ajouter un job
async processUpload(filePath: string) {
  return this.imageQueue.add('resize', {
    filePath,
    sizes: [150, 400, 1200],
  }, {
    attempts: 3,
    backoff: { type: 'exponential', delay: 2000 },
    removeOnComplete: 10,
    removeOnFail: 5,
  });
}
Ex 04 ⭐⭐ Moyen

Chat en temps réel avec rooms

Implémentez un systÚme de chat complet avec : création de rooms, rejoindre/quitter, envoi de messages, liste des membres connectés. Intégrez l'authentification JWT dans la Gateway.

Voir la solution
@WebSocketGateway({ cors: { origin: '*' }, namespace: '/chat' })
export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer() server: Server;
  private connectedUsers = new Map<string, { userId: number; rooms: Set<string> }>();

  async handleConnection(client: Socket) {
    const token = client.handshake.auth.token;
    try {
      const payload = this.jwtService.verify(token);
      this.connectedUsers.set(client.id, { userId: payload.sub, rooms: new Set() });
    } catch {
      client.disconnect();
    }
  }

  handleDisconnect(client: Socket) {
    const user = this.connectedUsers.get(client.id);
    if (user) {
      user.rooms.forEach(room => {
        this.server.to(room).emit('user:left', { socketId: client.id, userId: user.userId });
      });
      this.connectedUsers.delete(client.id);
    }
  }

  @SubscribeMessage('room:join')
  joinRoom(@ConnectedSocket() client: Socket, @MessageBody() room: string) {
    client.join(room);
    this.connectedUsers.get(client.id)?.rooms.add(room);
    this.server.to(room).emit('user:joined', { socketId: client.id, room });
    return { joined: room };
  }

  @SubscribeMessage('message:send')
  sendMessage(
    @ConnectedSocket() client: Socket,
    @MessageBody() data: { room: string; content: string },
  ) {
    const user = this.connectedUsers.get(client.id);
    const message = { content: data.content, userId: user?.userId, timestamp: Date.now() };
    this.server.to(data.room).emit('message:new', message);
    return message;
  }
}
Ex 05 ⭐⭐⭐ Difficile

CQRS — systùme de commandes

Refactorisez un OrdersModule en CQRS : créez PlaceOrderCommand, CancelOrderCommand, GetOrderQuery, GetUserOrdersQuery avec leurs handlers respectifs.

Voir la solution
// Commands
export class PlaceOrderCommand {
  constructor(public readonly dto: CreateOrderDto, public readonly userId: number) {}
}
export class CancelOrderCommand {
  constructor(public readonly orderId: number, public readonly userId: number) {}
}

// Queries
export class GetOrderQuery { constructor(public readonly id: number) {} }
export class GetUserOrdersQuery {
  constructor(public readonly userId: number, public readonly page: number) {}
}

// Handlers
@CommandHandler(PlaceOrderCommand)
export class PlaceOrderHandler implements ICommandHandler<PlaceOrderCommand> {
  async execute(cmd: PlaceOrderCommand): Promise<Order> {
    // Validation stock, calcul total, sauvegarde
    const order = await this.orderRepo.save({ ...cmd.dto, userId: cmd.userId });
    this.eventBus.publish(new OrderPlacedEvent(order));
    return order;
  }
}

@QueryHandler(GetUserOrdersQuery)
export class GetUserOrdersHandler implements IQueryHandler<GetUserOrdersQuery> {
  async execute(query: GetUserOrdersQuery) {
    return this.orderRepo.findAndCount({
      where: { userId: query.userId },
      take: 10, skip: (query.page - 1) * 10,
    });
  }
}

// Controller
@Post()
create(@Body() dto: CreateOrderDto, @CurrentUser('id') userId: number) {
  return this.commandBus.execute(new PlaceOrderCommand(dto, userId));
}
← Cours ▶ Mini-projet Module 09 →