Temps réel, événements & queues
Au-delà du REST : temps réel et architecture événementielle
NestJS supporte les WebSockets via Socket.io, la communication interne par événements avec EventEmitter2, et le traitement asynchrone avec BullMQ + Redis.
WebSockets avec Socket.io
npm install @nestjs/websockets @nestjs/platform-socket.io socket.io
// app.module.ts — rien à ajouter, les Gateways sont des providers
@Module({
providers: [EventsGateway],
})
export class EventsModule {}
Gateway WebSocket
import {
WebSocketGateway, WebSocketServer, SubscribeMessage,
MessageBody, ConnectedSocket, OnGatewayConnection,
OnGatewayDisconnect, OnGatewayInit,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
@WebSocketGateway({
cors: { origin: '*' },
namespace: '/chat', // Namespace Socket.io
})
export class ChatGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer()
server: Server;
private readonly logger = new Logger(ChatGateway.name);
afterInit(server: Server) {
this.logger.log('WebSocket Gateway initialized');
}
handleConnection(client: Socket) {
this.logger.log(`Client connected: ${client.id}`);
}
handleDisconnect(client: Socket) {
this.logger.log(`Client disconnected: ${client.id}`);
}
// Écouter un événement 'sendMessage'
@SubscribeMessage('sendMessage')
handleMessage(
@ConnectedSocket() client: Socket,
@MessageBody() payload: { room: string; message: string; author: string },
) {
// Émettre à tous les membres de la room
this.server.to(payload.room).emit('newMessage', {
message: payload.message,
author: payload.author,
timestamp: new Date().toISOString(),
});
}
// Émettre depuis un service externe
emitToAll(event: string, data: any) {
this.server.emit(event, data);
}
}
Client JavaScript (exemple)
const socket = io('http://localhost:3000/chat');
socket.emit('joinRoom', { room: 'general' });
socket.on('newMessage', (data) => {
console.log(`${data.author}: ${data.message}`);
});
socket.emit('sendMessage', {
room: 'general',
message: 'Hello world!',
author: 'Alice',
});
Rooms Socket.io
@SubscribeMessage('joinRoom')
handleJoinRoom(
@ConnectedSocket() client: Socket,
@MessageBody() data: { room: string; userId: string },
) {
client.join(data.room);
this.server.to(data.room).emit('userJoined', {
userId: data.userId,
room: data.room,
});
return { event: 'joined', data: { room: data.room } };
}
@SubscribeMessage('leaveRoom')
handleLeaveRoom(
@ConnectedSocket() client: Socket,
@MessageBody() data: { room: string },
) {
client.leave(data.room);
this.server.to(data.room).emit('userLeft', { socketId: client.id });
}
// Émettre à une room spécifique depuis un service
broadcastToRoom(room: string, event: string, data: any) {
this.server.to(room).emit(event, data);
}
// Émettre à tous sauf l'émetteur
broadcastExcept(socket: Socket, event: string, data: any) {
socket.broadcast.emit(event, data);
}
EventEmitter2 — architecture événementielle
npm install @nestjs/event-emitter eventemitter2
// app.module.ts
import { EventEmitterModule } from '@nestjs/event-emitter';
@Module({
imports: [
EventEmitterModule.forRoot({
wildcard: true, // Activer les wildcards : 'user.*'
delimiter: '.', // Séparateur pour les namespaces
maxListeners: 20,
}),
],
})
export class AppModule {}
// Émettre un événement
@Injectable()
export class UsersService {
constructor(private eventEmitter: EventEmitter2) {}
async create(dto: CreateUserDto): Promise<User> {
const user = await this.userRepo.save(this.userRepo.create(dto));
this.eventEmitter.emit('user.created', new UserCreatedEvent(user));
return user;
}
}
// Classe d'événement
export class UserCreatedEvent {
constructor(public readonly user: User) {}
}
// Écouter un événement
@Injectable()
export class MailListener {
@OnEvent('user.created')
async handleUserCreated(event: UserCreatedEvent) {
await this.mailService.sendWelcome(event.user.email, event.user.name);
}
@OnEvent('user.*') // Wildcard — tous les events user.*
logUserEvent(event: any) {
console.log('User event:', event);
}
}
BullMQ — File de tâches
npm install @nestjs/bullmq bullmq ioredis
// app.module.ts
import { BullModule } from '@nestjs/bullmq';
@Module({
imports: [
BullModule.forRoot({
connection: {
host: process.env.REDIS_HOST || 'localhost',
port: +process.env.REDIS_PORT || 6379,
},
}),
BullModule.registerQueue({ name: 'mail' }),
BullModule.registerQueue({ name: 'image-processing' }),
],
})
export class AppModule {}
// Ajouter un job à la queue
@Injectable()
export class MailService {
constructor(@InjectQueue('mail') private mailQueue: Queue) {}
async sendWelcomeEmail(user: User) {
await this.mailQueue.add('welcome', {
to: user.email,
name: user.name,
}, {
delay: 1000, // Délai de 1 seconde
attempts: 3, // Réessayer 3 fois en cas d'échec
backoff: { type: 'exponential', delay: 5000 },
removeOnComplete: true,
});
}
}
// Processor — traite les jobs
@Processor('mail')
export class MailProcessor {
private readonly logger = new Logger(MailProcessor.name);
@Process('welcome')
async handleWelcome(job: Job<{ to: string; name: string }>) {
const { to, name } = job.data;
this.logger.log(`Sending welcome email to ${to}`);
await this.sendEmail(to, `Bienvenue ${name} !`);
}
@OnWorkerEvent('completed')
onCompleted(job: Job) {
this.logger.log(`Job ${job.id} completed`);
}
@OnWorkerEvent('failed')
onFailed(job: Job, err: Error) {
this.logger.error(`Job ${job.id} failed: ${err.message}`);
}
}
CQRS — Command Query Responsibility Segregation
npm install @nestjs/cqrs
// Command
export class CreateUserCommand {
constructor(public readonly dto: CreateUserDto) {}
}
// Command Handler
@CommandHandler(CreateUserCommand)
export class CreateUserHandler implements ICommandHandler<CreateUserCommand> {
constructor(private userRepo: UserRepository) {}
async execute(command: CreateUserCommand): Promise<User> {
return this.userRepo.create(command.dto);
}
}
// Query
export class GetUsersQuery {
constructor(public readonly page: number, public readonly limit: number) {}
}
// Query Handler
@QueryHandler(GetUsersQuery)
export class GetUsersHandler implements IQueryHandler<GetUsersQuery> {
async execute(query: GetUsersQuery): Promise<User[]> {
return this.userRepo.findPaginated(query.page, query.limit);
}
}
// Module CQRS
@Module({
imports: [CqrsModule],
providers: [CreateUserHandler, GetUsersHandler],
})
export class UsersModule {}
// Usage dans un controller
@Controller('users')
export class UsersController {
constructor(
private commandBus: CommandBus,
private queryBus: QueryBus,
) {}
@Post()
create(@Body() dto: CreateUserDto) {
return this.commandBus.execute(new CreateUserCommand(dto));
}
@Get()
findAll(@Query('page') page = 1, @Query('limit') limit = 10) {
return this.queryBus.execute(new GetUsersQuery(+page, +limit));
}
}
Microservices — aperçu
npm install @nestjs/microservices
// Démarrer un microservice TCP
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.TCP,
options: { host: 'localhost', port: 3001 },
});
await app.listen();
// Ou Redis pub/sub
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.REDIS,
options: { host: 'localhost', port: 6379 },
});
// Handler dans le microservice
@MessagePattern('get_user')
async getUser(@Payload() data: { id: number }) {
return this.usersService.findOne(data.id);
}
// Client du microservice (dans l'app principale)
@Module({
imports: [
ClientsModule.register([{
name: 'USER_SERVICE',
transport: Transport.TCP,
options: { host: 'localhost', port: 3001 },
}]),
],
})
export class AppModule {}
@Injectable()
export class AppService {
constructor(@Inject('USER_SERVICE') private client: ClientProxy) {}
getUser(id: number) {
return this.client.send('get_user', { id });
}
}