NestJS CQRS: CommandBus

NestJS에서 @nestjs/cqrs 패키지는 Command Query Responsibility Segregation 패턴을 프레임워크 수준에서 지원합니다. 단순 CRUD를 넘어 복잡한 도메인 로직을 다룰 때, CQRS는 쓰기(Command)와 읽기(Query)를 물리적으로 분리하여 각각 독립적으로 최적화할 수 있게 합니다.

1. CQRS 핵심 구조: CommandBus · QueryBus · EventBus

CQRS의 세 가지 버스는 각각 다른 책임을 갖습니다:

버스 역할 핸들러 반환값
CommandBus 상태 변경 (쓰기) @CommandHandler void 또는 결과
QueryBus 데이터 조회 (읽기) @QueryHandler 조회 결과
EventBus 부수 효과 전파 @EventsHandler void

Command 정의와 Handler

// create-order.command.ts
export class CreateOrderCommand {
  constructor(
    public readonly userId: string,
    public readonly items: OrderItemDto[],
    public readonly shippingAddress: AddressDto,
  ) {}
}

// create-order.handler.ts
@CommandHandler(CreateOrderCommand)
export class CreateOrderHandler implements ICommandHandler<CreateOrderCommand> {
  constructor(
    private readonly orderRepo: OrderRepository,
    private readonly eventBus: EventBus,
  ) {}

  async execute(command: CreateOrderCommand): Promise<string> {
    const order = Order.create(
      command.userId,
      command.items,
      command.shippingAddress,
    );

    await this.orderRepo.save(order);

    // 도메인 이벤트 발행
    order.getUncommittedEvents().forEach(event =>
      this.eventBus.publish(event),
    );

    return order.id;
  }
}

Query 정의와 Handler

// get-order.query.ts
export class GetOrderQuery {
  constructor(public readonly orderId: string) {}
}

@QueryHandler(GetOrderQuery)
export class GetOrderHandler implements IQueryHandler<GetOrderQuery> {
  constructor(private readonly readRepo: OrderReadRepository) {}

  async execute(query: GetOrderQuery): Promise<OrderReadModel> {
    // 읽기 전용 모델에서 조회 — 쓰기 DB와 분리 가능
    return this.readRepo.findById(query.orderId);
  }
}

모듈 등록

@Module({
  imports: [CqrsModule],
  providers: [
    CreateOrderHandler,
    GetOrderHandler,
    OrderCreatedHandler, // EventsHandler
  ],
})
export class OrderModule {}

2. AggregateRoot와 도메인 이벤트

AggregateRoot를 상속하면 엔티티 내부에서 이벤트를 쌓아두고 한 번에 발행할 수 있습니다:

export class Order extends AggregateRoot {
  private status: OrderStatus;
  private items: OrderItem[];

  static create(userId: string, items: OrderItemDto[], address: AddressDto): Order {
    const order = new Order();
    order.id = uuid();
    order.status = OrderStatus.CREATED;

    // 이벤트를 내부 버퍼에 쌓기
    order.apply(new OrderCreatedEvent(order.id, userId, items, address));
    return order;
  }

  confirm() {
    if (this.status !== OrderStatus.CREATED) {
      throw new IllegalStateException('확인 불가 상태');
    }
    this.apply(new OrderConfirmedEvent(this.id));
  }

  // apply()가 호출하는 이벤트 핸들러 (선택적)
  onOrderCreatedEvent(event: OrderCreatedEvent) {
    this.items = event.items.map(i => OrderItem.from(i));
  }

  onOrderConfirmedEvent(event: OrderConfirmedEvent) {
    this.status = OrderStatus.CONFIRMED;
  }
}
⚠️ 주의: apply()는 내부 버퍼에 이벤트를 쌓을 뿐, 즉시 발행하지 않습니다. commit() 또는 mergeObjectContext()를 통해 EventBus에 실제 발행해야 합니다.

3. Saga 오케스트레이션

Saga는 여러 Aggregate를 걸치는 비즈니스 프로세스를 오케스트레이션합니다. NestJS에서는 RxJS Observable을 반환하는 @Saga() 데코레이터로 구현합니다:

@Injectable()
export class OrderSaga {
  @Saga()
  orderCreated = (events$: Observable<any>): Observable<ICommand> => {
    return events$.pipe(
      ofType(OrderCreatedEvent),
      map(event => new ReserveInventoryCommand(event.orderId, event.items)),
    );
  };

  @Saga()
  inventoryReserved = (events$: Observable<any>): Observable<ICommand> => {
    return events$.pipe(
      ofType(InventoryReservedEvent),
      map(event => new ProcessPaymentCommand(event.orderId, event.totalAmount)),
    );
  };

  // 보상 트랜잭션: 결제 실패 시 재고 복원
  @Saga()
  paymentFailed = (events$: Observable<any>): Observable<ICommand> => {
    return events$.pipe(
      ofType(PaymentFailedEvent),
      map(event => new ReleaseInventoryCommand(event.orderId)),
    );
  };
}
Saga 패턴 특징 적합한 상황
오케스트레이션 중앙 조정자가 흐름 제어 복잡한 보상 로직, 순서 보장
코레오그래피 각 서비스가 이벤트에 반응 단순한 파이프라인, 느슨한 결합

4. Event Sourcing 통합

Event Sourcing은 현재 상태 대신 이벤트의 시퀀스를 저장합니다. 모든 상태 변화가 이벤트로 기록되므로 완벽한 감사 로그와 시간 여행 디버깅이 가능합니다:

// event-store.service.ts
@Injectable()
export class EventStoreService {
  constructor(
    @InjectRepository(StoredEvent)
    private readonly eventRepo: Repository<StoredEvent>,
  ) {}

  async saveEvents(aggregateId: string, events: IEvent[], expectedVersion: number) {
    // 낙관적 동시성 제어
    const currentVersion = await this.getCurrentVersion(aggregateId);
    if (currentVersion !== expectedVersion) {
      throw new ConcurrencyException(
        `Expected version ${expectedVersion}, got ${currentVersion}`,
      );
    }

    const storedEvents = events.map((event, i) => ({
      aggregateId,
      version: expectedVersion + i + 1,
      eventType: event.constructor.name,
      payload: JSON.stringify(event),
      occurredAt: new Date(),
    }));

    await this.eventRepo.save(storedEvents);
  }

  async getEvents(aggregateId: string): Promise<StoredEvent[]> {
    return this.eventRepo.find({
      where: { aggregateId },
      order: { version: 'ASC' },
    });
  }
}

// Aggregate 복원
async loadAggregate(id: string): Promise<Order> {
  const events = await this.eventStore.getEvents(id);
  const order = new Order();

  events.forEach(stored => {
    const event = this.deserialize(stored);
    order.apply(event, { fromHistory: true }); // replay
  });

  return order;
}

스냅샷 최적화

이벤트가 수백 개 쌓이면 복원이 느려집니다. 주기적으로 스냅샷을 저장하고, 스냅샷 이후 이벤트만 재생하는 방식으로 최적화합니다:

async loadWithSnapshot(id: string): Promise<Order> {
  const snapshot = await this.snapshotRepo.findLatest(id);
  const fromVersion = snapshot ? snapshot.version : 0;

  const events = await this.eventStore.getEventsAfter(id, fromVersion);
  const order = snapshot
    ? Order.fromSnapshot(snapshot.data)
    : new Order();

  events.forEach(e => order.apply(this.deserialize(e), { fromHistory: true }));
  return order;
}

// 매 100번째 이벤트마다 스냅샷 저장
if (order.version % 100 === 0) {
  await this.snapshotRepo.save({
    aggregateId: id,
    version: order.version,
    data: order.toSnapshot(),
  });
}

5. Read Model 프로젝션

이벤트를 구독하여 읽기 전용 모델(Read Model)을 비동기로 갱신합니다. 읽기 모델은 조회에 최적화된 구조(비정규화, 캐시 등)로 자유롭게 설계할 수 있습니다:

// order-projection.handler.ts
@EventsHandler(OrderCreatedEvent, OrderConfirmedEvent, OrderShippedEvent)
export class OrderProjectionHandler
  implements IEventHandler<OrderCreatedEvent | OrderConfirmedEvent | OrderShippedEvent>
{
  constructor(
    private readonly readRepo: OrderReadRepository,
    private readonly statsRepo: OrderStatsRepository,
  ) {}

  async handle(event: OrderCreatedEvent | OrderConfirmedEvent | OrderShippedEvent) {
    if (event instanceof OrderCreatedEvent) {
      // 상세 Read Model
      await this.readRepo.upsert({
        orderId: event.orderId,
        userId: event.userId,
        status: 'CREATED',
        itemCount: event.items.length,
        totalAmount: event.items.reduce((sum, i) => sum + i.price * i.qty, 0),
        createdAt: event.occurredAt,
      });

      // 통계 Read Model
      await this.statsRepo.incrementDailyOrders(event.occurredAt);
    }

    if (event instanceof OrderConfirmedEvent) {
      await this.readRepo.updateStatus(event.orderId, 'CONFIRMED');
      await this.statsRepo.incrementConfirmed(event.occurredAt);
    }

    if (event instanceof OrderShippedEvent) {
      await this.readRepo.updateStatus(event.orderId, 'SHIPPED');
    }
  }
}
프로젝션 전략 설명 지연
동기 프로젝션 같은 트랜잭션에서 갱신 0ms (강한 일관성)
비동기 프로젝션 EventBus → 별도 핸들러 ms~초 (최종 일관성)
Catch-up 프로젝션 이벤트 스토어 폴링 초~분 (재구축 가능)

6. 이벤트 스키마 버전 관리

시간이 지나면 이벤트 구조가 변합니다. Upcasting 패턴으로 하위 호환성을 유지합니다:

// event-upcaster.ts
@Injectable()
export class EventUpcaster {
  private upcasters = new Map<string, (payload: any) => any>();

  constructor() {
    // v1 → v2: shippingAddress 필드 추가
    this.upcasters.set('OrderCreatedEvent_v1', (payload) => ({
      ...payload,
      schemaVersion: 2,
      shippingAddress: payload.address || { line1: '', city: '', zip: '' },
    }));
  }

  upcast(eventType: string, version: number, payload: any): any {
    const key = `${eventType}_v${version}`;
    const upcaster = this.upcasters.get(key);
    return upcaster ? upcaster(payload) : payload;
  }
}

7. Anti-Pattern 5가지

Anti-Pattern 문제 해결
Command에서 Query 호출 쓰기/읽기 결합, 테스트 어려움 필요한 데이터를 Command에 담기
이벤트에 과도한 데이터 이벤트 크기 폭증, 스키마 변경 지옥 변경된 것만 담기 (delta)
Saga에서 외부 API 직접 호출 실패 시 보상 불가 Command로 위임, Handler에서 호출
Read Model에서 Write DB 조회 CQRS 분리 무력화 프로젝션으로 Read DB 갱신
모든 도메인에 CQRS 적용 단순 CRUD에 과도한 복잡도 복잡한 도메인에만 선택적 적용

8. 실전 체크리스트

  • ✅ Command와 Query가 같은 Handler에 섞여 있지 않은가?
  • ✅ AggregateRoot의 불변식(invariant)이 Command Handler에서 검증되는가?
  • ✅ Saga 보상 트랜잭션이 모든 실패 시나리오를 커버하는가?
  • ✅ Event Sourcing 사용 시 스냅샷 주기가 설정되어 있는가?
  • ✅ Read Model 프로젝션이 멱등(idempotent)하게 구현되었는가?
  • ✅ 이벤트 스키마 버전 관리(Upcasting) 전략이 있는가?
  • ✅ 단순 CRUD 모듈에는 CQRS를 적용하지 않았는가?

💡 핵심 요약

CQRS는 복잡한 도메인에서 빛납니다. CommandBus로 의도를 명확히 하고, EventBus로 부수 효과를 분리하고, Saga로 프로세스를 오케스트레이션하세요. Event Sourcing은 감사·디버깅이 핵심일 때 결합하되, 모든 곳에 적용하지는 마세요.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux