Temps réel, événements & queues

Au-delà du REST : temps réel et architecture événementielle

NestJS supporte les WebSockets via Socket.io, la communication interne par événements avec EventEmitter2, et le traitement asynchrone avec BullMQ + Redis.

WebSockets avec Socket.io

npm install @nestjs/websockets @nestjs/platform-socket.io socket.io
// app.module.ts — rien à ajouter, les Gateways sont des providers
@Module({
  providers: [EventsGateway],
})
export class EventsModule {}

Gateway WebSocket

import {
  WebSocketGateway, WebSocketServer, SubscribeMessage,
  MessageBody, ConnectedSocket, OnGatewayConnection,
  OnGatewayDisconnect, OnGatewayInit,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';

@WebSocketGateway({
  cors: { origin: '*' },
  namespace: '/chat',    // Namespace Socket.io
})
export class ChatGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer()
  server: Server;

  private readonly logger = new Logger(ChatGateway.name);

  afterInit(server: Server) {
    this.logger.log('WebSocket Gateway initialized');
  }

  handleConnection(client: Socket) {
    this.logger.log(`Client connected: ${client.id}`);
  }

  handleDisconnect(client: Socket) {
    this.logger.log(`Client disconnected: ${client.id}`);
  }

  // Écouter un événement 'sendMessage'
  @SubscribeMessage('sendMessage')
  handleMessage(
    @ConnectedSocket() client: Socket,
    @MessageBody() payload: { room: string; message: string; author: string },
  ) {
    // Émettre à tous les membres de la room
    this.server.to(payload.room).emit('newMessage', {
      message: payload.message,
      author: payload.author,
      timestamp: new Date().toISOString(),
    });
  }

  // Émettre depuis un service externe
  emitToAll(event: string, data: any) {
    this.server.emit(event, data);
  }
}

Client JavaScript (exemple)

const socket = io('http://localhost:3000/chat');

socket.emit('joinRoom', { room: 'general' });

socket.on('newMessage', (data) => {
  console.log(`${data.author}: ${data.message}`);
});

socket.emit('sendMessage', {
  room: 'general',
  message: 'Hello world!',
  author: 'Alice',
});

Rooms Socket.io

@SubscribeMessage('joinRoom')
handleJoinRoom(
  @ConnectedSocket() client: Socket,
  @MessageBody() data: { room: string; userId: string },
) {
  client.join(data.room);
  this.server.to(data.room).emit('userJoined', {
    userId: data.userId,
    room: data.room,
  });
  return { event: 'joined', data: { room: data.room } };
}

@SubscribeMessage('leaveRoom')
handleLeaveRoom(
  @ConnectedSocket() client: Socket,
  @MessageBody() data: { room: string },
) {
  client.leave(data.room);
  this.server.to(data.room).emit('userLeft', { socketId: client.id });
}

// Émettre à une room spécifique depuis un service
broadcastToRoom(room: string, event: string, data: any) {
  this.server.to(room).emit(event, data);
}

// Émettre à tous sauf l'émetteur
broadcastExcept(socket: Socket, event: string, data: any) {
  socket.broadcast.emit(event, data);
}

EventEmitter2 — architecture événementielle

npm install @nestjs/event-emitter eventemitter2
// app.module.ts
import { EventEmitterModule } from '@nestjs/event-emitter';

@Module({
  imports: [
    EventEmitterModule.forRoot({
      wildcard: true,    // Activer les wildcards : 'user.*'
      delimiter: '.',    // Séparateur pour les namespaces
      maxListeners: 20,
    }),
  ],
})
export class AppModule {}

// Émettre un événement
@Injectable()
export class UsersService {
  constructor(private eventEmitter: EventEmitter2) {}

  async create(dto: CreateUserDto): Promise<User> {
    const user = await this.userRepo.save(this.userRepo.create(dto));
    this.eventEmitter.emit('user.created', new UserCreatedEvent(user));
    return user;
  }
}

// Classe d'événement
export class UserCreatedEvent {
  constructor(public readonly user: User) {}
}

// Écouter un événement
@Injectable()
export class MailListener {
  @OnEvent('user.created')
  async handleUserCreated(event: UserCreatedEvent) {
    await this.mailService.sendWelcome(event.user.email, event.user.name);
  }

  @OnEvent('user.*')  // Wildcard — tous les events user.*
  logUserEvent(event: any) {
    console.log('User event:', event);
  }
}

BullMQ — File de tâches

npm install @nestjs/bullmq bullmq ioredis
// app.module.ts
import { BullModule } from '@nestjs/bullmq';

@Module({
  imports: [
    BullModule.forRoot({
      connection: {
        host: process.env.REDIS_HOST || 'localhost',
        port: +process.env.REDIS_PORT || 6379,
      },
    }),
    BullModule.registerQueue({ name: 'mail' }),
    BullModule.registerQueue({ name: 'image-processing' }),
  ],
})
export class AppModule {}

// Ajouter un job à la queue
@Injectable()
export class MailService {
  constructor(@InjectQueue('mail') private mailQueue: Queue) {}

  async sendWelcomeEmail(user: User) {
    await this.mailQueue.add('welcome', {
      to: user.email,
      name: user.name,
    }, {
      delay: 1000,         // Délai de 1 seconde
      attempts: 3,         // Réessayer 3 fois en cas d'échec
      backoff: { type: 'exponential', delay: 5000 },
      removeOnComplete: true,
    });
  }
}

// Processor — traite les jobs
@Processor('mail')
export class MailProcessor {
  private readonly logger = new Logger(MailProcessor.name);

  @Process('welcome')
  async handleWelcome(job: Job<{ to: string; name: string }>) {
    const { to, name } = job.data;
    this.logger.log(`Sending welcome email to ${to}`);
    await this.sendEmail(to, `Bienvenue ${name} !`);
  }

  @OnWorkerEvent('completed')
  onCompleted(job: Job) {
    this.logger.log(`Job ${job.id} completed`);
  }

  @OnWorkerEvent('failed')
  onFailed(job: Job, err: Error) {
    this.logger.error(`Job ${job.id} failed: ${err.message}`);
  }
}

CQRS — Command Query Responsibility Segregation

npm install @nestjs/cqrs
// Command
export class CreateUserCommand {
  constructor(public readonly dto: CreateUserDto) {}
}

// Command Handler
@CommandHandler(CreateUserCommand)
export class CreateUserHandler implements ICommandHandler<CreateUserCommand> {
  constructor(private userRepo: UserRepository) {}

  async execute(command: CreateUserCommand): Promise<User> {
    return this.userRepo.create(command.dto);
  }
}

// Query
export class GetUsersQuery {
  constructor(public readonly page: number, public readonly limit: number) {}
}

// Query Handler
@QueryHandler(GetUsersQuery)
export class GetUsersHandler implements IQueryHandler<GetUsersQuery> {
  async execute(query: GetUsersQuery): Promise<User[]> {
    return this.userRepo.findPaginated(query.page, query.limit);
  }
}

// Module CQRS
@Module({
  imports: [CqrsModule],
  providers: [CreateUserHandler, GetUsersHandler],
})
export class UsersModule {}

// Usage dans un controller
@Controller('users')
export class UsersController {
  constructor(
    private commandBus: CommandBus,
    private queryBus: QueryBus,
  ) {}

  @Post()
  create(@Body() dto: CreateUserDto) {
    return this.commandBus.execute(new CreateUserCommand(dto));
  }

  @Get()
  findAll(@Query('page') page = 1, @Query('limit') limit = 10) {
    return this.queryBus.execute(new GetUsersQuery(+page, +limit));
  }
}

Microservices — aperçu

npm install @nestjs/microservices
// Démarrer un microservice TCP
const app = await NestFactory.createMicroservice(AppModule, {
  transport: Transport.TCP,
  options: { host: 'localhost', port: 3001 },
});
await app.listen();

// Ou Redis pub/sub
const app = await NestFactory.createMicroservice(AppModule, {
  transport: Transport.REDIS,
  options: { host: 'localhost', port: 6379 },
});

// Handler dans le microservice
@MessagePattern('get_user')
async getUser(@Payload() data: { id: number }) {
  return this.usersService.findOne(data.id);
}

// Client du microservice (dans l'app principale)
@Module({
  imports: [
    ClientsModule.register([{
      name: 'USER_SERVICE',
      transport: Transport.TCP,
      options: { host: 'localhost', port: 3001 },
    }]),
  ],
})
export class AppModule {}

@Injectable()
export class AppService {
  constructor(@Inject('USER_SERVICE') private client: ClientProxy) {}

  getUser(id: number) {
    return this.client.send('get_user', { id });
  }
}
✏️ Exercices du module ▶ Mini-projet Module 09 →