NestJS EventEmitter2 이벤트 설계

NestJS EventEmitter2란

NestJS의 @nestjs/event-emitter 패키지는 eventemitter2 라이브러리를 래핑하여, 애플리케이션 내부에서 이벤트 기반 아키텍처를 구현합니다. HTTP 요청-응답의 동기적 흐름에서 벗어나, 도메인 이벤트를 발행하고 여러 리스너가 독립적으로 처리하는 느슨한 결합(Loose Coupling)을 실현합니다.

Kafka나 RabbitMQ 같은 외부 메시지 브로커 없이도, 단일 프로세스 내에서 이벤트 드리븐 패턴을 적용할 수 있어 모놀리식 또는 모듈러 모놀리스 구조에 적합합니다.

설치 및 기본 설정

npm install @nestjs/event-emitter
// app.module.ts
import { EventEmitterModule } from '@nestjs/event-emitter';

@Module({
  imports: [
    EventEmitterModule.forRoot({
      wildcard: true,          // 와일드카드 패턴 지원
      delimiter: '.',          // 이벤트명 구분자
      newListener: false,      // 리스너 추가 이벤트 비활성화
      removeListener: false,   // 리스너 제거 이벤트 비활성화
      maxListeners: 20,        // 메모리 누수 방지
      verboseMemoryLeak: true, // 리스너 초과 시 경고
      ignoreErrors: false,     // 에러 무시하지 않음
    }),
  ],
})
export class AppModule {}

이벤트 정의와 타입 안전성

이벤트를 클래스로 정의하면 타입 안전성과 IDE 자동완성을 확보합니다:

// events/order.events.ts
export class OrderCreatedEvent {
  constructor(
    public readonly orderId: string,
    public readonly userId: string,
    public readonly items: OrderItem[],
    public readonly totalAmount: number,
    public readonly createdAt: Date = new Date(),
  ) {}
}

export class OrderCancelledEvent {
  constructor(
    public readonly orderId: string,
    public readonly userId: string,
    public readonly reason: string,
    public readonly cancelledAt: Date = new Date(),
  ) {}
}

export class PaymentCompletedEvent {
  constructor(
    public readonly orderId: string,
    public readonly paymentId: string,
    public readonly amount: number,
    public readonly method: 'card' | 'bank' | 'point',
  ) {}
}

이벤트명을 상수로 중앙 관리합니다:

// events/event-names.ts
export const EventNames = {
  ORDER_CREATED: 'order.created',
  ORDER_CANCELLED: 'order.cancelled',
  ORDER_SHIPPED: 'order.shipped',
  PAYMENT_COMPLETED: 'payment.completed',
  PAYMENT_FAILED: 'payment.failed',
  USER_REGISTERED: 'user.registered',
  USER_DELETED: 'user.deleted',
} as const;

export type EventName = typeof EventNames[keyof typeof EventNames];

이벤트 발행: EventEmitter2

@Injectable()
export class OrderService {
  constructor(
    private readonly orderRepository: OrderRepository,
    private readonly eventEmitter: EventEmitter2,
  ) {}

  async createOrder(dto: CreateOrderDto, userId: string): Promise<Order> {
    const order = await this.orderRepository.save(
      Order.create(dto, userId),
    );

    // 이벤트 발행 — 동기 리스너 실행
    this.eventEmitter.emit(
      EventNames.ORDER_CREATED,
      new OrderCreatedEvent(
        order.id,
        userId,
        order.items,
        order.totalAmount,
      ),
    );

    return order;
  }

  async cancelOrder(orderId: string, userId: string, reason: string) {
    const order = await this.orderRepository.findOneOrFail(orderId);
    order.cancel(reason);
    await this.orderRepository.save(order);

    this.eventEmitter.emit(
      EventNames.ORDER_CANCELLED,
      new OrderCancelledEvent(orderId, userId, reason),
    );
  }
}

이벤트 리스너: @OnEvent

여러 모듈에서 독립적으로 이벤트를 구독합니다:

// notification/notification.listener.ts
@Injectable()
export class NotificationListener {
  private readonly logger = new Logger(NotificationListener.name);

  constructor(
    private readonly emailService: EmailService,
    private readonly pushService: PushService,
  ) {}

  @OnEvent(EventNames.ORDER_CREATED)
  async handleOrderCreated(event: OrderCreatedEvent) {
    this.logger.log(`주문 생성 알림: orderId=${event.orderId}`);

    await Promise.all([
      this.emailService.sendOrderConfirmation(event.userId, event.orderId),
      this.pushService.send(event.userId, {
        title: '주문 완료',
        body: `주문 ${event.orderId}이 접수되었습니다.`,
      }),
    ]);
  }

  @OnEvent(EventNames.ORDER_CANCELLED)
  async handleOrderCancelled(event: OrderCancelledEvent) {
    await this.emailService.sendCancellationNotice(
      event.userId,
      event.orderId,
      event.reason,
    );
  }
}

// analytics/analytics.listener.ts
@Injectable()
export class AnalyticsListener {
  constructor(private readonly analyticsService: AnalyticsService) {}

  @OnEvent(EventNames.ORDER_CREATED)
  async trackOrderCreated(event: OrderCreatedEvent) {
    await this.analyticsService.track({
      event: 'order_created',
      userId: event.userId,
      properties: {
        orderId: event.orderId,
        amount: event.totalAmount,
        itemCount: event.items.length,
      },
    });
  }
}

// inventory/inventory.listener.ts
@Injectable()
export class InventoryListener {
  constructor(private readonly inventoryService: InventoryService) {}

  @OnEvent(EventNames.ORDER_CREATED)
  async reserveStock(event: OrderCreatedEvent) {
    for (const item of event.items) {
      await this.inventoryService.reserve(item.productId, item.quantity);
    }
  }
}

하나의 order.created 이벤트에 3개의 독립 리스너가 반응합니다. OrderService는 알림·분석·재고 로직을 전혀 모릅니다.

비동기 리스너와 에러 처리

@OnEventasync 옵션으로 비동기 실행을 제어합니다:

// 동기 실행 (기본) — emit()이 리스너 완료를 기다림
@OnEvent(EventNames.ORDER_CREATED)
handleSync(event: OrderCreatedEvent) {
  // emit() 호출자가 이 메서드 완료까지 대기
}

// 비동기 실행 — emit()이 즉시 반환, 리스너는 백그라운드 실행
@OnEvent(EventNames.ORDER_CREATED, { async: true })
async handleAsync(event: OrderCreatedEvent) {
  // 느린 작업도 요청 응답에 영향 없음
  await this.heavyProcessing(event);
}

// 비동기 + 에러 안전
@OnEvent(EventNames.ORDER_CREATED, { async: true })
async handleWithErrorHandling(event: OrderCreatedEvent) {
  try {
    await this.externalApi.call(event);
  } catch (error) {
    this.logger.error(
      `이벤트 처리 실패: ${event.orderId}`,
      error.stack,
    );
    // 재시도 큐에 등록
    await this.retryQueue.add('order.created.retry', {
      event,
      attempt: 1,
    });
  }
}
옵션 동작 사용 시나리오
기본 (동기) emit()이 리스너 완료 대기 재고 차감 등 즉시 반영 필수 작업
async: true emit()이 즉시 반환 이메일, 푸시, 분석 등 지연 허용 작업
prependListener: true 리스너 실행 순서 우선 감사 로그 등 반드시 먼저 실행할 작업

와일드카드 패턴

wildcard: true 설정 시 *** 패턴으로 여러 이벤트를 한 번에 구독합니다:

// order.* → order.created, order.cancelled, order.shipped 모두 매칭
@OnEvent('order.*')
handleAllOrderEvents(event: any) {
  this.auditService.log('order', event);
}

// ** → 모든 이벤트 매칭 (글로벌 로거)
@OnEvent('**')
handleAllEvents(event: any) {
  this.logger.debug(`이벤트 발생: ${JSON.stringify(event)}`);
}

테스트 전략

이벤트 기반 코드의 테스트는 발행과 구독을 분리하여 검증합니다:

describe('OrderService', () => {
  let service: OrderService;
  let eventEmitter: EventEmitter2;

  beforeEach(async () => {
    const module = await Test.createTestingModule({
      imports: [EventEmitterModule.forRoot()],
      providers: [
        OrderService,
        { provide: OrderRepository, useValue: mockRepository },
      ],
    }).compile();

    service = module.get(OrderService);
    eventEmitter = module.get(EventEmitter2);
  });

  it('주문 생성 시 이벤트를 발행해야 한다', async () => {
    const emitSpy = jest.spyOn(eventEmitter, 'emit');
    mockRepository.save.mockResolvedValue(mockOrder);

    await service.createOrder(createOrderDto, 'user-1');

    expect(emitSpy).toHaveBeenCalledWith(
      'order.created',
      expect.objectContaining({
        orderId: mockOrder.id,
        userId: 'user-1',
      }),
    );
  });
});

describe('NotificationListener', () => {
  it('order.created 이벤트 수신 시 이메일을 발송해야 한다', async () => {
    const event = new OrderCreatedEvent('ORD-001', 'user-1', [], 50000);

    await listener.handleOrderCreated(event);

    expect(emailService.sendOrderConfirmation)
      .toHaveBeenCalledWith('user-1', 'ORD-001');
  });
});

실전 패턴: 도메인 이벤트 + BullMQ 조합

인프로세스 이벤트의 한계(프로세스 재시작 시 유실)를 BullMQ로 보완합니다:

@Injectable()
export class EventBridge {
  constructor(
    @InjectQueue('durable-events')
    private readonly queue: Queue,
  ) {}

  // 즉시 처리가 필요한 이벤트 → EventEmitter2
  // 내구성이 필요한 이벤트 → BullMQ
  @OnEvent('order.created', { async: true })
  async bridgeToQueue(event: OrderCreatedEvent) {
    // Redis 기반 큐에 저장 → 프로세스 재시작 후에도 처리 보장
    await this.queue.add('order.created', event, {
      attempts: 3,
      backoff: { type: 'exponential', delay: 1000 },
      removeOnComplete: 100,
      removeOnFail: 500,
    });
  }
}

주의사항과 AsyncLocalStorage 컨텍스트

  • 메모리 전용: EventEmitter2는 인프로세스 이벤트입니다. 프로세스 종료 시 미처리 이벤트는 유실됩니다. 중요 이벤트는 BullMQ나 Kafka로 브리지하세요.
  • 순환 참조 주의: 이벤트 리스너가 다시 이벤트를 발행하면 무한 루프 위험이 있습니다.
  • AsyncLocalStorage: 비동기 리스너에서 요청 컨텍스트(requestId 등)가 필요하면 이벤트 페이로드에 포함시키세요.
  • 리스너 등록 순서: 같은 이벤트의 여러 리스너 실행 순서는 모듈 로딩 순서에 의존합니다. 순서가 중요하면 prependListener를 사용하세요.
  • maxListeners: 기본값 10을 초과하면 메모리 누수 경고가 발생합니다. 의도적이라면 값을 높이세요.

정리

NestJS EventEmitter2는 모듈 간 직접 의존성을 제거하고, 하나의 비즈니스 액션에 여러 부수 효과를 느슨하게 연결하는 핵심 도구입니다. 동기/비동기 리스너 선택, 와일드카드 패턴, BullMQ 브리지를 조합하면 단일 프로세스에서도 견고한 이벤트 드리븐 아키텍처를 구축할 수 있습니다. 외부 메시지 브로커가 필요 없는 수준의 이벤트 처리에 최적입니다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux