NestJS에서 @nestjs/cqrs 패키지는 Command Query Responsibility Segregation 패턴을 프레임워크 수준에서 지원합니다. 단순 CRUD를 넘어 복잡한 도메인 로직을 다룰 때, CQRS는 쓰기(Command)와 읽기(Query)를 물리적으로 분리하여 각각 독립적으로 최적화할 수 있게 합니다.
1. CQRS 핵심 구조: CommandBus · QueryBus · EventBus
CQRS의 세 가지 버스는 각각 다른 책임을 갖습니다:
| 버스 | 역할 | 핸들러 | 반환값 |
|---|---|---|---|
| CommandBus | 상태 변경 (쓰기) | @CommandHandler | void 또는 결과 |
| QueryBus | 데이터 조회 (읽기) | @QueryHandler | 조회 결과 |
| EventBus | 부수 효과 전파 | @EventsHandler | void |
Command 정의와 Handler
// create-order.command.ts
export class CreateOrderCommand {
constructor(
public readonly userId: string,
public readonly items: OrderItemDto[],
public readonly shippingAddress: AddressDto,
) {}
}
// create-order.handler.ts
@CommandHandler(CreateOrderCommand)
export class CreateOrderHandler implements ICommandHandler<CreateOrderCommand> {
constructor(
private readonly orderRepo: OrderRepository,
private readonly eventBus: EventBus,
) {}
async execute(command: CreateOrderCommand): Promise<string> {
const order = Order.create(
command.userId,
command.items,
command.shippingAddress,
);
await this.orderRepo.save(order);
// 도메인 이벤트 발행
order.getUncommittedEvents().forEach(event =>
this.eventBus.publish(event),
);
return order.id;
}
}
Query 정의와 Handler
// get-order.query.ts
export class GetOrderQuery {
constructor(public readonly orderId: string) {}
}
@QueryHandler(GetOrderQuery)
export class GetOrderHandler implements IQueryHandler<GetOrderQuery> {
constructor(private readonly readRepo: OrderReadRepository) {}
async execute(query: GetOrderQuery): Promise<OrderReadModel> {
// 읽기 전용 모델에서 조회 — 쓰기 DB와 분리 가능
return this.readRepo.findById(query.orderId);
}
}
모듈 등록
@Module({
imports: [CqrsModule],
providers: [
CreateOrderHandler,
GetOrderHandler,
OrderCreatedHandler, // EventsHandler
],
})
export class OrderModule {}
2. AggregateRoot와 도메인 이벤트
AggregateRoot를 상속하면 엔티티 내부에서 이벤트를 쌓아두고 한 번에 발행할 수 있습니다:
export class Order extends AggregateRoot {
private status: OrderStatus;
private items: OrderItem[];
static create(userId: string, items: OrderItemDto[], address: AddressDto): Order {
const order = new Order();
order.id = uuid();
order.status = OrderStatus.CREATED;
// 이벤트를 내부 버퍼에 쌓기
order.apply(new OrderCreatedEvent(order.id, userId, items, address));
return order;
}
confirm() {
if (this.status !== OrderStatus.CREATED) {
throw new IllegalStateException('확인 불가 상태');
}
this.apply(new OrderConfirmedEvent(this.id));
}
// apply()가 호출하는 이벤트 핸들러 (선택적)
onOrderCreatedEvent(event: OrderCreatedEvent) {
this.items = event.items.map(i => OrderItem.from(i));
}
onOrderConfirmedEvent(event: OrderConfirmedEvent) {
this.status = OrderStatus.CONFIRMED;
}
}
apply()는 내부 버퍼에 이벤트를 쌓을 뿐, 즉시 발행하지 않습니다. commit() 또는 mergeObjectContext()를 통해 EventBus에 실제 발행해야 합니다.
3. Saga 오케스트레이션
Saga는 여러 Aggregate를 걸치는 비즈니스 프로세스를 오케스트레이션합니다. NestJS에서는 RxJS Observable을 반환하는 @Saga() 데코레이터로 구현합니다:
@Injectable()
export class OrderSaga {
@Saga()
orderCreated = (events$: Observable<any>): Observable<ICommand> => {
return events$.pipe(
ofType(OrderCreatedEvent),
map(event => new ReserveInventoryCommand(event.orderId, event.items)),
);
};
@Saga()
inventoryReserved = (events$: Observable<any>): Observable<ICommand> => {
return events$.pipe(
ofType(InventoryReservedEvent),
map(event => new ProcessPaymentCommand(event.orderId, event.totalAmount)),
);
};
// 보상 트랜잭션: 결제 실패 시 재고 복원
@Saga()
paymentFailed = (events$: Observable<any>): Observable<ICommand> => {
return events$.pipe(
ofType(PaymentFailedEvent),
map(event => new ReleaseInventoryCommand(event.orderId)),
);
};
}
| Saga 패턴 | 특징 | 적합한 상황 |
|---|---|---|
| 오케스트레이션 | 중앙 조정자가 흐름 제어 | 복잡한 보상 로직, 순서 보장 |
| 코레오그래피 | 각 서비스가 이벤트에 반응 | 단순한 파이프라인, 느슨한 결합 |
4. Event Sourcing 통합
Event Sourcing은 현재 상태 대신 이벤트의 시퀀스를 저장합니다. 모든 상태 변화가 이벤트로 기록되므로 완벽한 감사 로그와 시간 여행 디버깅이 가능합니다:
// event-store.service.ts
@Injectable()
export class EventStoreService {
constructor(
@InjectRepository(StoredEvent)
private readonly eventRepo: Repository<StoredEvent>,
) {}
async saveEvents(aggregateId: string, events: IEvent[], expectedVersion: number) {
// 낙관적 동시성 제어
const currentVersion = await this.getCurrentVersion(aggregateId);
if (currentVersion !== expectedVersion) {
throw new ConcurrencyException(
`Expected version ${expectedVersion}, got ${currentVersion}`,
);
}
const storedEvents = events.map((event, i) => ({
aggregateId,
version: expectedVersion + i + 1,
eventType: event.constructor.name,
payload: JSON.stringify(event),
occurredAt: new Date(),
}));
await this.eventRepo.save(storedEvents);
}
async getEvents(aggregateId: string): Promise<StoredEvent[]> {
return this.eventRepo.find({
where: { aggregateId },
order: { version: 'ASC' },
});
}
}
// Aggregate 복원
async loadAggregate(id: string): Promise<Order> {
const events = await this.eventStore.getEvents(id);
const order = new Order();
events.forEach(stored => {
const event = this.deserialize(stored);
order.apply(event, { fromHistory: true }); // replay
});
return order;
}
스냅샷 최적화
이벤트가 수백 개 쌓이면 복원이 느려집니다. 주기적으로 스냅샷을 저장하고, 스냅샷 이후 이벤트만 재생하는 방식으로 최적화합니다:
async loadWithSnapshot(id: string): Promise<Order> {
const snapshot = await this.snapshotRepo.findLatest(id);
const fromVersion = snapshot ? snapshot.version : 0;
const events = await this.eventStore.getEventsAfter(id, fromVersion);
const order = snapshot
? Order.fromSnapshot(snapshot.data)
: new Order();
events.forEach(e => order.apply(this.deserialize(e), { fromHistory: true }));
return order;
}
// 매 100번째 이벤트마다 스냅샷 저장
if (order.version % 100 === 0) {
await this.snapshotRepo.save({
aggregateId: id,
version: order.version,
data: order.toSnapshot(),
});
}
5. Read Model 프로젝션
이벤트를 구독하여 읽기 전용 모델(Read Model)을 비동기로 갱신합니다. 읽기 모델은 조회에 최적화된 구조(비정규화, 캐시 등)로 자유롭게 설계할 수 있습니다:
// order-projection.handler.ts
@EventsHandler(OrderCreatedEvent, OrderConfirmedEvent, OrderShippedEvent)
export class OrderProjectionHandler
implements IEventHandler<OrderCreatedEvent | OrderConfirmedEvent | OrderShippedEvent>
{
constructor(
private readonly readRepo: OrderReadRepository,
private readonly statsRepo: OrderStatsRepository,
) {}
async handle(event: OrderCreatedEvent | OrderConfirmedEvent | OrderShippedEvent) {
if (event instanceof OrderCreatedEvent) {
// 상세 Read Model
await this.readRepo.upsert({
orderId: event.orderId,
userId: event.userId,
status: 'CREATED',
itemCount: event.items.length,
totalAmount: event.items.reduce((sum, i) => sum + i.price * i.qty, 0),
createdAt: event.occurredAt,
});
// 통계 Read Model
await this.statsRepo.incrementDailyOrders(event.occurredAt);
}
if (event instanceof OrderConfirmedEvent) {
await this.readRepo.updateStatus(event.orderId, 'CONFIRMED');
await this.statsRepo.incrementConfirmed(event.occurredAt);
}
if (event instanceof OrderShippedEvent) {
await this.readRepo.updateStatus(event.orderId, 'SHIPPED');
}
}
}
| 프로젝션 전략 | 설명 | 지연 |
|---|---|---|
| 동기 프로젝션 | 같은 트랜잭션에서 갱신 | 0ms (강한 일관성) |
| 비동기 프로젝션 | EventBus → 별도 핸들러 | ms~초 (최종 일관성) |
| Catch-up 프로젝션 | 이벤트 스토어 폴링 | 초~분 (재구축 가능) |
6. 이벤트 스키마 버전 관리
시간이 지나면 이벤트 구조가 변합니다. Upcasting 패턴으로 하위 호환성을 유지합니다:
// event-upcaster.ts
@Injectable()
export class EventUpcaster {
private upcasters = new Map<string, (payload: any) => any>();
constructor() {
// v1 → v2: shippingAddress 필드 추가
this.upcasters.set('OrderCreatedEvent_v1', (payload) => ({
...payload,
schemaVersion: 2,
shippingAddress: payload.address || { line1: '', city: '', zip: '' },
}));
}
upcast(eventType: string, version: number, payload: any): any {
const key = `${eventType}_v${version}`;
const upcaster = this.upcasters.get(key);
return upcaster ? upcaster(payload) : payload;
}
}
7. Anti-Pattern 5가지
| Anti-Pattern | 문제 | 해결 |
|---|---|---|
| Command에서 Query 호출 | 쓰기/읽기 결합, 테스트 어려움 | 필요한 데이터를 Command에 담기 |
| 이벤트에 과도한 데이터 | 이벤트 크기 폭증, 스키마 변경 지옥 | 변경된 것만 담기 (delta) |
| Saga에서 외부 API 직접 호출 | 실패 시 보상 불가 | Command로 위임, Handler에서 호출 |
| Read Model에서 Write DB 조회 | CQRS 분리 무력화 | 프로젝션으로 Read DB 갱신 |
| 모든 도메인에 CQRS 적용 | 단순 CRUD에 과도한 복잡도 | 복잡한 도메인에만 선택적 적용 |
8. 실전 체크리스트
- ✅ Command와 Query가 같은 Handler에 섞여 있지 않은가?
- ✅ AggregateRoot의 불변식(invariant)이 Command Handler에서 검증되는가?
- ✅ Saga 보상 트랜잭션이 모든 실패 시나리오를 커버하는가?
- ✅ Event Sourcing 사용 시 스냅샷 주기가 설정되어 있는가?
- ✅ Read Model 프로젝션이 멱등(idempotent)하게 구현되었는가?
- ✅ 이벤트 스키마 버전 관리(Upcasting) 전략이 있는가?
- ✅ 단순 CRUD 모듈에는 CQRS를 적용하지 않았는가?
💡 핵심 요약
CQRS는 복잡한 도메인에서 빛납니다. CommandBus로 의도를 명확히 하고, EventBus로 부수 효과를 분리하고, Saga로 프로세스를 오케스트레이션하세요. Event Sourcing은 감사·디버깅이 핵심일 때 결합하되, 모든 곳에 적용하지는 마세요.