CQRS란?
CQRS(Command Query Responsibility Segregation)는 데이터의 쓰기(Command)와 읽기(Query)를 별도의 모델로 분리하는 패턴입니다. 전통적인 CRUD에서는 하나의 모델이 생성·조회·수정·삭제를 모두 담당하지만, CQRS에서는 Command 모델과 Query 모델을 독립적으로 설계합니다.
NestJS는 @nestjs/cqrs 패키지를 통해 Command Bus, Query Bus, Event Bus를 기본 제공합니다. 이 글에서는 실전 구현부터 이벤트 소싱 연동까지 다룹니다.
설치와 모듈 설정
npm install @nestjs/cqrs
// app.module.ts
import { CqrsModule } from '@nestjs/cqrs';
@Module({
imports: [CqrsModule], // CommandBus, QueryBus, EventBus 자동 등록
})
export class AppModule {}
Command: 쓰기 작업
Command는 “시스템의 상태를 변경하라”는 의도를 표현합니다. 반환값이 없거나 최소한의 결과(ID 등)만 반환합니다.
// commands/create-order.command.ts
export class CreateOrderCommand {
constructor(
public readonly userId: string,
public readonly items: { productId: string; quantity: number }[],
public readonly shippingAddress: string,
) {}
}
// commands/handlers/create-order.handler.ts
import { CommandHandler, ICommandHandler, EventPublisher } from '@nestjs/cqrs';
@CommandHandler(CreateOrderCommand)
export class CreateOrderHandler implements ICommandHandler<CreateOrderCommand> {
constructor(
private readonly orderRepo: OrderRepository,
private readonly publisher: EventPublisher,
) {}
async execute(command: CreateOrderCommand): Promise<string> {
const { userId, items, shippingAddress } = command;
// 비즈니스 검증
const totalAmount = await this.calculateTotal(items);
if (totalAmount <= 0) {
throw new BadRequestException('주문 금액이 유효하지 않습니다');
}
// 엔티티 생성
const order = this.publisher.mergeObjectContext(
new Order(userId, items, shippingAddress, totalAmount),
);
await this.orderRepo.save(order);
// 도메인 이벤트 발행
order.commit();
return order.id;
}
private async calculateTotal(items: { productId: string; quantity: number }[]) {
// 상품 가격 조회 및 합산 로직
return items.reduce((sum, item) => sum + item.quantity * 10000, 0);
}
}
컨트롤러에서 Command 실행
@Controller()
export class OrderController {
constructor(private readonly commandBus: CommandBus) {}
@Post()
async createOrder(@Body() dto: CreateOrderDto, @User() user: AuthUser) {
const orderId = await this.commandBus.execute(
new CreateOrderCommand(user.id, dto.items, dto.shippingAddress),
);
return { orderId };
}
}
Query: 읽기 작업
Query는 “데이터를 조회하라”는 요청입니다. 시스템 상태를 변경하지 않으며, 읽기에 최적화된 모델을 사용할 수 있습니다.
// queries/get-order-detail.query.ts
export class GetOrderDetailQuery {
constructor(
public readonly orderId: string,
public readonly userId: string,
) {}
}
// queries/handlers/get-order-detail.handler.ts
@QueryHandler(GetOrderDetailQuery)
export class GetOrderDetailHandler implements IQueryHandler<GetOrderDetailQuery> {
constructor(
private readonly readRepo: OrderReadRepository, // 읽기 전용 저장소
) {}
async execute(query: GetOrderDetailQuery): Promise<OrderDetailDto> {
const order = await this.readRepo.findDetailView(query.orderId);
if (!order) {
throw new NotFoundException(`Order ${query.orderId} not found`);
}
// 권한 확인
if (order.userId !== query.userId) {
throw new ForbiddenException('접근 권한이 없습니다');
}
return order;
}
}
// queries/handlers/list-orders.handler.ts
export class ListOrdersQuery {
constructor(
public readonly userId: string,
public readonly page: number = 1,
public readonly limit: number = 20,
public readonly status?: OrderStatus,
) {}
}
@QueryHandler(ListOrdersQuery)
export class ListOrdersHandler implements IQueryHandler<ListOrdersQuery> {
constructor(private readonly readRepo: OrderReadRepository) {}
async execute(query: ListOrdersQuery): Promise<PaginatedResult<OrderSummaryDto>> {
return this.readRepo.findByUserPaginated(
query.userId,
query.page,
query.limit,
query.status,
);
}
}
// 컨트롤러에서 사용
@Get(':id')
async getOrder(@Param('id') id: string, @User() user: AuthUser) {
return this.queryBus.execute(new GetOrderDetailQuery(id, user.id));
}
Event: 도메인 이벤트
Command 실행 후 발생하는 부수 효과를 Event로 처리합니다. 이메일 발송, 알림, 통계 업데이트 등을 Command Handler에서 분리합니다.
// events/order-created.event.ts
export class OrderCreatedEvent {
constructor(
public readonly orderId: string,
public readonly userId: string,
public readonly totalAmount: number,
public readonly items: { productId: string; quantity: number }[],
) {}
}
// Order 엔티티 (AggregateRoot 상속)
import { AggregateRoot } from '@nestjs/cqrs';
export class Order extends AggregateRoot {
constructor(userId, items, address, amount) {
super();
this.id = generateId();
this.userId = userId;
this.items = items;
this.address = address;
this.amount = amount;
this.status = OrderStatus.CREATED;
// 도메인 이벤트 등록 (commit() 호출 시 발행)
this.apply(new OrderCreatedEvent(this.id, userId, amount, items));
}
cancel(reason: string) {
if (this.status !== OrderStatus.CREATED) {
throw new Error('배송 시작 후에는 취소할 수 없습니다');
}
this.status = OrderStatus.CANCELLED;
this.apply(new OrderCancelledEvent(this.id, this.userId, reason));
}
}
// events/handlers/order-created.handler.ts
@EventsHandler(OrderCreatedEvent)
export class OrderCreatedHandler implements IEventHandler<OrderCreatedEvent> {
constructor(
private readonly mailService: MailService,
private readonly inventoryService: InventoryService,
private readonly analyticsService: AnalyticsService,
) {}
async handle(event: OrderCreatedEvent) {
// 여러 부수 효과를 병렬 처리
await Promise.allSettled([
this.mailService.sendOrderConfirmation(event.userId, event.orderId),
this.inventoryService.reserveStock(event.items),
this.analyticsService.trackOrder(event),
]);
}
}
핵심: Event Handler가 실패해도 Command 자체는 이미 성공한 상태입니다. Promise.allSettled로 하나의 실패가 다른 처리를 막지 않도록 합니다.
Saga: 복잡한 워크플로우
여러 Command를 순차적으로 실행하는 긴 프로세스를 Saga로 관리합니다.
@Injectable()
export class OrderSaga {
constructor(private readonly commandBus: CommandBus) {}
// 주문 생성 → 결제 요청
@Saga()
orderCreated = (events$: Observable<any>): Observable<ICommand> => {
return events$.pipe(
ofType(OrderCreatedEvent),
map(event => new ProcessPaymentCommand(
event.orderId,
event.userId,
event.totalAmount,
)),
);
};
// 결제 성공 → 배송 요청
@Saga()
paymentProcessed = (events$: Observable<any>): Observable<ICommand> => {
return events$.pipe(
ofType(PaymentProcessedEvent),
map(event => new RequestShipmentCommand(event.orderId)),
);
};
// 결제 실패 → 주문 취소
@Saga()
paymentFailed = (events$: Observable<any>): Observable<ICommand> => {
return events$.pipe(
ofType(PaymentFailedEvent),
map(event => new CancelOrderCommand(event.orderId, '결제 실패')),
);
};
}
모듈 구성 패턴
// order.module.ts
const CommandHandlers = [CreateOrderHandler, CancelOrderHandler];
const QueryHandlers = [GetOrderDetailHandler, ListOrdersHandler];
const EventHandlers = [OrderCreatedHandler, OrderCancelledHandler];
const Sagas = [OrderSaga];
@Module({
imports: [CqrsModule],
controllers: [OrderController],
providers: [
...CommandHandlers,
...QueryHandlers,
...EventHandlers,
...Sagas,
OrderRepository,
OrderReadRepository,
],
})
export class OrderModule {}
읽기/쓰기 저장소 분리
CQRS의 진정한 힘은 읽기와 쓰기에 다른 저장소를 사용할 때 나옵니다.
| 구분 | 쓰기 모델 | 읽기 모델 |
|---|---|---|
| 저장소 | PostgreSQL (정규화) | MongoDB / Elasticsearch (비정규화) |
| 최적화 | 데이터 무결성, 트랜잭션 | 조회 속도, 검색 |
| 동기화 | Event Handler에서 읽기 모델 업데이트 (Eventual Consistency) | |
// 읽기 모델 동기화 — Event Handler에서
@EventsHandler(OrderCreatedEvent)
export class OrderProjectionHandler implements IEventHandler<OrderCreatedEvent> {
constructor(
@InjectModel('OrderView') private readonly orderViewModel: Model<OrderView>,
) {}
async handle(event: OrderCreatedEvent) {
// MongoDB에 비정규화된 뷰 저장
await this.orderViewModel.create({
orderId: event.orderId,
userId: event.userId,
totalAmount: event.totalAmount,
itemCount: event.items.length,
status: 'CREATED',
createdAt: new Date(),
});
}
}
테스트 전략
describe('CreateOrderHandler', () => {
let handler: CreateOrderHandler;
let orderRepo: jest.Mocked<OrderRepository>;
let publisher: EventPublisher;
beforeEach(async () => {
const module = await Test.createTestingModule({
providers: [
CreateOrderHandler,
{ provide: OrderRepository, useValue: { save: jest.fn() } },
{ provide: EventPublisher, useValue: { mergeObjectContext: jest.fn(o => o) } },
],
}).compile();
handler = module.get(CreateOrderHandler);
orderRepo = module.get(OrderRepository);
});
it('주문을 생성하고 ID를 반환한다', async () => {
const command = new CreateOrderCommand('user-1', [
{ productId: 'prod-1', quantity: 2 },
], '서울시 강남구');
const orderId = await handler.execute(command);
expect(orderId).toBeDefined();
expect(orderRepo.save).toHaveBeenCalledTimes(1);
});
});
Command Handler와 Query Handler는 독립적으로 테스트할 수 있습니다. NestJS 테스트 전략에서 다룬 모킹 패턴을 활용하세요. Saga 테스트는 RxJS의 TestScheduler를 사용합니다.
CQRS 도입 기준
| 도입 적합 | 과도한 경우 |
|---|---|
| 읽기/쓰기 비율이 크게 다름 | 단순 CRUD 앱 |
| 복잡한 비즈니스 로직·워크플로우 | 팀 규모가 작고 도메인이 단순 |
| 읽기/쓰기 독립 스케일링 필요 | 데이터 정합성이 즉시 필요 (금융 원장) |
CQRS는 Spring Event 비동기 처리와 유사한 이벤트 기반 아키텍처이지만, 읽기/쓰기 모델 분리라는 추가 차원이 있습니다. 복잡도와 이점을 저울질한 후 도입하세요.
정리
NestJS CQRS의 핵심은 “명령과 조회를 분리하고, 이벤트로 연결하라”입니다. CommandBus로 쓰기, QueryBus로 읽기, EventBus로 부수 효과를 처리하며, Saga로 복잡한 워크플로우를 조율합니다. @nestjs/cqrs의 AggregateRoot, EventPublisher, Saga 데코레이터를 활용하면 도메인 이벤트 기반 아키텍처를 깔끔하게 구현할 수 있습니다.