NestJS CQRS 이벤트 기반 설계

CQRS란?

CQRS(Command Query Responsibility Segregation)는 데이터의 쓰기(Command)읽기(Query)를 별도의 모델로 분리하는 패턴입니다. 전통적인 CRUD에서는 하나의 모델이 생성·조회·수정·삭제를 모두 담당하지만, CQRS에서는 Command 모델과 Query 모델을 독립적으로 설계합니다.

NestJS는 @nestjs/cqrs 패키지를 통해 Command Bus, Query Bus, Event Bus를 기본 제공합니다. 이 글에서는 실전 구현부터 이벤트 소싱 연동까지 다룹니다.

설치와 모듈 설정

npm install @nestjs/cqrs

// app.module.ts
import { CqrsModule } from '@nestjs/cqrs';

@Module({
  imports: [CqrsModule],  // CommandBus, QueryBus, EventBus 자동 등록
})
export class AppModule {}

Command: 쓰기 작업

Command는 “시스템의 상태를 변경하라”는 의도를 표현합니다. 반환값이 없거나 최소한의 결과(ID 등)만 반환합니다.

// commands/create-order.command.ts
export class CreateOrderCommand {
  constructor(
    public readonly userId: string,
    public readonly items: { productId: string; quantity: number }[],
    public readonly shippingAddress: string,
  ) {}
}

// commands/handlers/create-order.handler.ts
import { CommandHandler, ICommandHandler, EventPublisher } from '@nestjs/cqrs';

@CommandHandler(CreateOrderCommand)
export class CreateOrderHandler implements ICommandHandler<CreateOrderCommand> {
  constructor(
    private readonly orderRepo: OrderRepository,
    private readonly publisher: EventPublisher,
  ) {}

  async execute(command: CreateOrderCommand): Promise<string> {
    const { userId, items, shippingAddress } = command;

    // 비즈니스 검증
    const totalAmount = await this.calculateTotal(items);
    if (totalAmount <= 0) {
      throw new BadRequestException('주문 금액이 유효하지 않습니다');
    }

    // 엔티티 생성
    const order = this.publisher.mergeObjectContext(
      new Order(userId, items, shippingAddress, totalAmount),
    );

    await this.orderRepo.save(order);

    // 도메인 이벤트 발행
    order.commit();

    return order.id;
  }

  private async calculateTotal(items: { productId: string; quantity: number }[]) {
    // 상품 가격 조회 및 합산 로직
    return items.reduce((sum, item) => sum + item.quantity * 10000, 0);
  }
}

컨트롤러에서 Command 실행

@Controller()
export class OrderController {
  constructor(private readonly commandBus: CommandBus) {}

  @Post()
  async createOrder(@Body() dto: CreateOrderDto, @User() user: AuthUser) {
    const orderId = await this.commandBus.execute(
      new CreateOrderCommand(user.id, dto.items, dto.shippingAddress),
    );
    return { orderId };
  }
}

Query: 읽기 작업

Query는 “데이터를 조회하라”는 요청입니다. 시스템 상태를 변경하지 않으며, 읽기에 최적화된 모델을 사용할 수 있습니다.

// queries/get-order-detail.query.ts
export class GetOrderDetailQuery {
  constructor(
    public readonly orderId: string,
    public readonly userId: string,
  ) {}
}

// queries/handlers/get-order-detail.handler.ts
@QueryHandler(GetOrderDetailQuery)
export class GetOrderDetailHandler implements IQueryHandler<GetOrderDetailQuery> {
  constructor(
    private readonly readRepo: OrderReadRepository,  // 읽기 전용 저장소
  ) {}

  async execute(query: GetOrderDetailQuery): Promise<OrderDetailDto> {
    const order = await this.readRepo.findDetailView(query.orderId);

    if (!order) {
      throw new NotFoundException(`Order ${query.orderId} not found`);
    }

    // 권한 확인
    if (order.userId !== query.userId) {
      throw new ForbiddenException('접근 권한이 없습니다');
    }

    return order;
  }
}

// queries/handlers/list-orders.handler.ts
export class ListOrdersQuery {
  constructor(
    public readonly userId: string,
    public readonly page: number = 1,
    public readonly limit: number = 20,
    public readonly status?: OrderStatus,
  ) {}
}

@QueryHandler(ListOrdersQuery)
export class ListOrdersHandler implements IQueryHandler<ListOrdersQuery> {
  constructor(private readonly readRepo: OrderReadRepository) {}

  async execute(query: ListOrdersQuery): Promise<PaginatedResult<OrderSummaryDto>> {
    return this.readRepo.findByUserPaginated(
      query.userId,
      query.page,
      query.limit,
      query.status,
    );
  }
}

// 컨트롤러에서 사용
@Get(':id')
async getOrder(@Param('id') id: string, @User() user: AuthUser) {
  return this.queryBus.execute(new GetOrderDetailQuery(id, user.id));
}

Event: 도메인 이벤트

Command 실행 후 발생하는 부수 효과를 Event로 처리합니다. 이메일 발송, 알림, 통계 업데이트 등을 Command Handler에서 분리합니다.

// events/order-created.event.ts
export class OrderCreatedEvent {
  constructor(
    public readonly orderId: string,
    public readonly userId: string,
    public readonly totalAmount: number,
    public readonly items: { productId: string; quantity: number }[],
  ) {}
}

// Order 엔티티 (AggregateRoot 상속)
import { AggregateRoot } from '@nestjs/cqrs';

export class Order extends AggregateRoot {
  constructor(userId, items, address, amount) {
    super();
    this.id = generateId();
    this.userId = userId;
    this.items = items;
    this.address = address;
    this.amount = amount;
    this.status = OrderStatus.CREATED;

    // 도메인 이벤트 등록 (commit() 호출 시 발행)
    this.apply(new OrderCreatedEvent(this.id, userId, amount, items));
  }

  cancel(reason: string) {
    if (this.status !== OrderStatus.CREATED) {
      throw new Error('배송 시작 후에는 취소할 수 없습니다');
    }
    this.status = OrderStatus.CANCELLED;
    this.apply(new OrderCancelledEvent(this.id, this.userId, reason));
  }
}

// events/handlers/order-created.handler.ts
@EventsHandler(OrderCreatedEvent)
export class OrderCreatedHandler implements IEventHandler<OrderCreatedEvent> {
  constructor(
    private readonly mailService: MailService,
    private readonly inventoryService: InventoryService,
    private readonly analyticsService: AnalyticsService,
  ) {}

  async handle(event: OrderCreatedEvent) {
    // 여러 부수 효과를 병렬 처리
    await Promise.allSettled([
      this.mailService.sendOrderConfirmation(event.userId, event.orderId),
      this.inventoryService.reserveStock(event.items),
      this.analyticsService.trackOrder(event),
    ]);
  }
}

핵심: Event Handler가 실패해도 Command 자체는 이미 성공한 상태입니다. Promise.allSettled로 하나의 실패가 다른 처리를 막지 않도록 합니다.

Saga: 복잡한 워크플로우

여러 Command를 순차적으로 실행하는 긴 프로세스를 Saga로 관리합니다.

@Injectable()
export class OrderSaga {
  constructor(private readonly commandBus: CommandBus) {}

  // 주문 생성 → 결제 요청
  @Saga()
  orderCreated = (events$: Observable<any>): Observable<ICommand> => {
    return events$.pipe(
      ofType(OrderCreatedEvent),
      map(event => new ProcessPaymentCommand(
        event.orderId,
        event.userId,
        event.totalAmount,
      )),
    );
  };

  // 결제 성공 → 배송 요청
  @Saga()
  paymentProcessed = (events$: Observable<any>): Observable<ICommand> => {
    return events$.pipe(
      ofType(PaymentProcessedEvent),
      map(event => new RequestShipmentCommand(event.orderId)),
    );
  };

  // 결제 실패 → 주문 취소
  @Saga()
  paymentFailed = (events$: Observable<any>): Observable<ICommand> => {
    return events$.pipe(
      ofType(PaymentFailedEvent),
      map(event => new CancelOrderCommand(event.orderId, '결제 실패')),
    );
  };
}

모듈 구성 패턴

// order.module.ts
const CommandHandlers = [CreateOrderHandler, CancelOrderHandler];
const QueryHandlers = [GetOrderDetailHandler, ListOrdersHandler];
const EventHandlers = [OrderCreatedHandler, OrderCancelledHandler];
const Sagas = [OrderSaga];

@Module({
  imports: [CqrsModule],
  controllers: [OrderController],
  providers: [
    ...CommandHandlers,
    ...QueryHandlers,
    ...EventHandlers,
    ...Sagas,
    OrderRepository,
    OrderReadRepository,
  ],
})
export class OrderModule {}

읽기/쓰기 저장소 분리

CQRS의 진정한 힘은 읽기와 쓰기에 다른 저장소를 사용할 때 나옵니다.

구분 쓰기 모델 읽기 모델
저장소 PostgreSQL (정규화) MongoDB / Elasticsearch (비정규화)
최적화 데이터 무결성, 트랜잭션 조회 속도, 검색
동기화 Event Handler에서 읽기 모델 업데이트 (Eventual Consistency)
// 읽기 모델 동기화 — Event Handler에서
@EventsHandler(OrderCreatedEvent)
export class OrderProjectionHandler implements IEventHandler<OrderCreatedEvent> {
  constructor(
    @InjectModel('OrderView') private readonly orderViewModel: Model<OrderView>,
  ) {}

  async handle(event: OrderCreatedEvent) {
    // MongoDB에 비정규화된 뷰 저장
    await this.orderViewModel.create({
      orderId: event.orderId,
      userId: event.userId,
      totalAmount: event.totalAmount,
      itemCount: event.items.length,
      status: 'CREATED',
      createdAt: new Date(),
    });
  }
}

테스트 전략

describe('CreateOrderHandler', () => {
  let handler: CreateOrderHandler;
  let orderRepo: jest.Mocked<OrderRepository>;
  let publisher: EventPublisher;

  beforeEach(async () => {
    const module = await Test.createTestingModule({
      providers: [
        CreateOrderHandler,
        { provide: OrderRepository, useValue: { save: jest.fn() } },
        { provide: EventPublisher, useValue: { mergeObjectContext: jest.fn(o => o) } },
      ],
    }).compile();

    handler = module.get(CreateOrderHandler);
    orderRepo = module.get(OrderRepository);
  });

  it('주문을 생성하고 ID를 반환한다', async () => {
    const command = new CreateOrderCommand('user-1', [
      { productId: 'prod-1', quantity: 2 },
    ], '서울시 강남구');

    const orderId = await handler.execute(command);

    expect(orderId).toBeDefined();
    expect(orderRepo.save).toHaveBeenCalledTimes(1);
  });
});

Command Handler와 Query Handler는 독립적으로 테스트할 수 있습니다. NestJS 테스트 전략에서 다룬 모킹 패턴을 활용하세요. Saga 테스트는 RxJS의 TestScheduler를 사용합니다.

CQRS 도입 기준

도입 적합 과도한 경우
읽기/쓰기 비율이 크게 다름 단순 CRUD 앱
복잡한 비즈니스 로직·워크플로우 팀 규모가 작고 도메인이 단순
읽기/쓰기 독립 스케일링 필요 데이터 정합성이 즉시 필요 (금융 원장)

CQRS는 Spring Event 비동기 처리와 유사한 이벤트 기반 아키텍처이지만, 읽기/쓰기 모델 분리라는 추가 차원이 있습니다. 복잡도와 이점을 저울질한 후 도입하세요.

정리

NestJS CQRS의 핵심은 “명령과 조회를 분리하고, 이벤트로 연결하라”입니다. CommandBus로 쓰기, QueryBus로 읽기, EventBus로 부수 효과를 처리하며, Saga로 복잡한 워크플로우를 조율합니다. @nestjs/cqrs의 AggregateRoot, EventPublisher, Saga 데코레이터를 활용하면 도메인 이벤트 기반 아키텍처를 깔끔하게 구현할 수 있습니다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux