왜 큐가 필요한가: HTTP 요청 밖에서 일을 처리하는 이유
API 요청 안에서 이메일 발송, 이미지 리사이징, PDF 생성, 외부 API 호출 같은 무거운 작업을 동기적으로 처리하면 응답 시간이 길어지고, 실패 시 재시도가 불가능합니다. 큐(Queue)는 이런 작업을 비동기로 분리하여 API는 즉시 응답하고, 백그라운드 워커가 별도로 처리합니다.
NestJS는 @nestjs/bullmq 패키지를 통해 BullMQ(Redis 기반 큐 라이브러리)와 공식 통합을 제공합니다. 레거시 @nestjs/bull(Bull 4.x)도 있지만, NestJS 10+에서는 BullMQ 기반의 @nestjs/bullmq가 권장됩니다.
설치와 기본 설정
# BullMQ 기반 (권장)
npm install @nestjs/bullmq bullmq
# 또는 레거시 Bull 기반
# npm install @nestjs/bull bull
// app.module.ts
import { BullModule } from '@nestjs/bullmq';
@Module({
imports: [
BullModule.forRoot({
connection: {
host: 'localhost',
port: 6379,
// password: process.env.REDIS_PASSWORD,
},
}),
// 큐 등록
BullModule.registerQueue({
name: 'email', // 큐 이름
}),
BullModule.registerQueue({
name: 'image-processing',
defaultJobOptions: {
attempts: 3, // 기본 재시도 횟수
backoff: {
type: 'exponential',
delay: 1000, // 1초부터 지수 백오프
},
removeOnComplete: 100, // 완료된 작업 100개만 유지
removeOnFail: 500, // 실패한 작업 500개 유지
},
}),
EmailModule,
ImageModule,
],
})
export class AppModule {}
Producer: 큐에 작업 추가하기
Producer는 큐에 작업(Job)을 추가하는 역할입니다. @InjectQueue() 데코레이터로 큐 인스턴스를 주입받습니다.
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
@Injectable()
export class EmailService {
constructor(
@InjectQueue('email') private readonly emailQueue: Queue,
) {}
async sendWelcomeEmail(userId: number, email: string) {
// 큐에 작업 추가 — 즉시 반환
await this.emailQueue.add(
'welcome', // Job 이름 (라우팅에 사용)
{ userId, email }, // Job 데이터 (직렬화 가능해야 함)
{
priority: 1, // 높은 우선순위 (낮을수록 우선)
delay: 5000, // 5초 후 실행
},
);
}
async sendBulkEmails(users: Array<{ id: number; email: string }>) {
// 대량 작업 추가 — addBulk으로 한 번에 추가
const jobs = users.map(user => ({
name: 'newsletter',
data: { userId: user.id, email: user.email },
opts: { priority: 5 },
}));
await this.emailQueue.addBulk(jobs);
}
}
Consumer (Processor): 큐 작업 처리하기
Consumer는 @Processor() 데코레이터가 붙은 클래스로, BullMQ의 Worker를 NestJS DI 시스템에 통합합니다.
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';
import { Logger } from '@nestjs/common';
@Processor('email')
export class EmailProcessor extends WorkerHost {
private readonly logger = new Logger(EmailProcessor.name);
async process(job: Job<{ userId: number; email: string }>): Promise<any> {
this.logger.log(`Processing job ${job.id}: ${job.name}`);
switch (job.name) {
case 'welcome':
return this.sendWelcome(job.data);
case 'newsletter':
return this.sendNewsletter(job.data);
default:
throw new Error(`Unknown job name: ${job.name}`);
}
}
private async sendWelcome(data: { userId: number; email: string }) {
// 실제 이메일 발송 로직
await this.mailerService.send({
to: data.email,
subject: '환영합니다!',
template: 'welcome',
});
return { sent: true };
}
private async sendNewsletter(data: { userId: number; email: string }) {
// 뉴스레터 발송 로직
await this.mailerService.send({
to: data.email,
subject: '주간 뉴스레터',
template: 'newsletter',
});
return { sent: true };
}
}
핵심: @Processor('email')는 'email' 큐의 Worker를 생성합니다. process() 메서드가 각 Job을 순차적으로(기본 concurrency=1) 처리합니다.
Job Options 완전 정리: 재시도·지연·우선순위·TTL
| 옵션 | 타입 | 기본값 | 설명 |
|---|---|---|---|
attempts |
number | 0 (재시도 없음) | 최대 시도 횟수 (1=한 번만, 3=원본+재시도2) |
backoff |
object | 없음 | { type: 'exponential' | 'fixed', delay: ms } |
delay |
number | 0 | 밀리초 후 실행 (예약 작업) |
priority |
number | 0 | 낮을수록 높은 우선순위 |
removeOnComplete |
boolean | number | false | true=즉시 삭제, number=최대 N개 유지 |
removeOnFail |
boolean | number | false | 실패한 Job 보존 정책 |
jobId |
string | 자동 생성 | 커스텀 ID — 중복 방지에 활용 |
repeat |
object | 없음 | 반복 실행 (크론 대체) |
// 실무 Job Options 예시
await this.emailQueue.add('welcome', data, {
attempts: 5,
backoff: {
type: 'exponential',
delay: 2000, // 2s → 4s → 8s → 16s → 32s
},
removeOnComplete: {
age: 3600, // 완료 후 1시간 보존
count: 1000, // 최대 1000개 보존
},
removeOnFail: {
age: 86400 * 7, // 실패 후 7일 보존
},
});
Concurrency: 병렬 처리 제어
// 동시에 5개 Job을 병렬 처리
@Processor('image-processing', {
concurrency: 5,
})
export class ImageProcessor extends WorkerHost {
async process(job: Job) {
// CPU/메모리 집중 작업
return this.resizeImage(job.data.imageUrl, job.data.sizes);
}
}
| concurrency | 적합한 작업 | 주의사항 |
|---|---|---|
| 1 (기본) | 순서 보장 필요, 외부 API rate limit | 처리량 제한 |
| 5~10 | 이메일 발송, HTTP 호출 | 외부 서비스 부하 고려 |
| CPU 코어 수 | 이미지/비디오 처리 | Node.js 이벤트 루프 블로킹 주의 |
이벤트 리스닝: Job 생명주기 추적
BullMQ의 Worker는 다양한 이벤트를 발생시킵니다. @OnWorkerEvent() 데코레이터로 Processor 클래스 안에서 이벤트를 처리할 수 있습니다.
import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq';
import { Job } from 'bullmq';
@Processor('email')
export class EmailProcessor extends WorkerHost {
async process(job: Job): Promise<any> {
// 처리 로직
}
@OnWorkerEvent('completed')
onCompleted(job: Job) {
this.logger.log(`Job ${job.id} completed. Result: ${JSON.stringify(job.returnvalue)}`);
}
@OnWorkerEvent('failed')
onFailed(job: Job, error: Error) {
this.logger.error(
`Job ${job.id} failed after ${job.attemptsMade} attempts: ${error.message}`,
);
// Slack/Discord 알림, 모니터링 시스템에 보고
this.alertService.notify(`Email job failed: ${job.id}`);
}
@OnWorkerEvent('progress')
onProgress(job: Job, progress: number) {
this.logger.log(`Job ${job.id} progress: ${progress}%`);
}
@OnWorkerEvent('stalled')
onStalled(jobId: string) {
this.logger.warn(`Job ${jobId} stalled — worker may have crashed`);
}
}
주요 이벤트 정리
| 이벤트 | 발생 시점 | 활용 |
|---|---|---|
completed |
Job 성공 완료 | 후속 작업 트리거, 로깅 |
failed |
모든 재시도 소진 후 최종 실패 | 알림, DLQ 처리 |
progress |
Job 내에서 job.updateProgress() 호출 시 |
진행률 표시 |
stalled |
Worker가 lock 갱신 실패 (크래시 추정) | 모니터링, 재할당 |
Repeatable Jobs: 크론 대체 스케줄링
BullMQ의 repeat 옵션으로 주기적 작업을 스케줄링할 수 있습니다. @nestjs/schedule과 달리 Redis에 상태가 저장되므로 여러 인스턴스에서 중복 실행되지 않습니다.
// 모듈 초기화 시 Repeatable Job 등록
@Injectable()
export class ScheduleSetupService implements OnModuleInit {
constructor(
@InjectQueue('maintenance') private readonly maintenanceQueue: Queue,
) {}
async onModuleInit() {
// 매일 자정에 실행
await this.maintenanceQueue.add(
'cleanup-expired-sessions',
{}, // data
{
repeat: {
pattern: '0 0 * * *', // 크론 표현식
tz: 'Asia/Seoul', // 타임존
},
jobId: 'cleanup-sessions', // 고정 ID로 중복 등록 방지
},
);
// 5분마다 실행
await this.maintenanceQueue.add(
'health-check',
{},
{
repeat: {
every: 300000, // 300초 = 5분
},
jobId: 'health-check',
},
);
}
}
중복 방지: jobId를 고정하면 애플리케이션이 재시작되어도 같은 ID의 repeatable job이 중복 등록되지 않습니다. BullMQ는 동일한 repeat 설정 + jobId 조합은 하나만 유지합니다.
Job Progress: 대용량 작업의 진행률 추적
@Processor('export')
export class ExportProcessor extends WorkerHost {
async process(job: Job<{ userId: number; format: string }>) {
const totalRecords = await this.getRecordCount(job.data.userId);
let processed = 0;
for await (const batch of this.getBatches(job.data.userId, 1000)) {
await this.exportBatch(batch, job.data.format);
processed += batch.length;
// 진행률 업데이트 (0~100)
await job.updateProgress(Math.round((processed / totalRecords) * 100));
}
return { totalExported: processed };
}
}
// API에서 진행률 조회
@Controller('exports')
export class ExportController {
constructor(@InjectQueue('export') private readonly exportQueue: Queue) {}
@Get(':jobId/progress')
async getProgress(@Param('jobId') jobId: string) {
const job = await this.exportQueue.getJob(jobId);
if (!job) throw new NotFoundException();
return {
state: await job.getState(), // 'waiting' | 'active' | 'completed' | 'failed'
progress: job.progress, // 0~100
result: job.returnvalue,
};
}
}
Separate Processes: CPU 집약 작업의 격리
Node.js는 단일 스레드이므로, CPU 집약적인 Job이 이벤트 루프를 블로킹하면 HTTP 요청 처리도 멈춥니다. BullMQ는 별도 프로세스(sandboxed processor)에서 Job을 실행하는 기능을 제공합니다.
// image.processor.ts — 별도 파일로 분리
// 이 파일은 NestJS DI 컨텍스트 밖에서 실행됨
import { SandboxedJob } from 'bullmq';
export default async function (job: SandboxedJob) {
// CPU 집약적 이미지 처리
const sharp = require('sharp');
const result = await sharp(job.data.buffer)
.resize(job.data.width, job.data.height)
.toBuffer();
return { size: result.length };
}
// 모듈에서 sandboxed processor 등록
BullModule.registerQueue({
name: 'image-processing',
processors: [{
path: join(__dirname, 'image.processor.js'),
concurrency: 4,
}],
})
주의: Sandboxed processor는 NestJS DI 컨테이너에 접근할 수 없습니다. 서비스 주입이 필요하면 WorkerHost를 사용하고, 순수 CPU 작업만 sandboxed로 분리합니다.
Redis 연결 관리와 장애 대응
BullModule.forRoot({
connection: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD,
maxRetriesPerRequest: null, // BullMQ 필수: null로 설정
enableReadyCheck: false, // 클러스터 환경에서 권장
},
})
필수 설정: maxRetriesPerRequest: null은 BullMQ가 Redis 연결이 끊겼을 때 무한히 재연결을 시도하도록 합니다. 이 값이 기본값(20)이면 연결 실패 시 Worker가 중지됩니다. BullMQ 공식 문서에서 이 설정을 필수로 요구합니다.
Bull Board: 웹 UI 대시보드
npm install @bull-board/api @bull-board/express @bull-board/nestjs
import { BullBoardModule } from '@bull-board/nestjs';
import { ExpressAdapter } from '@bull-board/express';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
@Module({
imports: [
BullBoardModule.forRoot({
route: '/queues', // 대시보드 경로
adapter: ExpressAdapter,
}),
BullBoardModule.forFeature({
name: 'email',
adapter: BullMQAdapter,
}),
BullBoardModule.forFeature({
name: 'image-processing',
adapter: BullMQAdapter,
}),
],
})
export class AppModule {}
// http://localhost:3000/queues 에서 대시보드 접근
Bull Board에서 확인 가능한 정보:
- 큐별 Waiting / Active / Completed / Failed / Delayed 작업 수
- 개별 Job의 데이터, 로그, 에러 스택트레이스
- 실패한 Job 수동 재시도
- Repeatable Job 관리
@nestjs/bull vs @nestjs/bullmq 비교
| 항목 | @nestjs/bull (레거시) | @nestjs/bullmq (권장) |
|---|---|---|
| 기반 라이브러리 | Bull 4.x | BullMQ 4.x+ |
| Processor 방식 | @Process() 데코레이터 |
WorkerHost 클래스 상속 |
| 이벤트 리스닝 | @OnQueueActive() 등 |
@OnWorkerEvent() |
| Flow (의존성 체인) | 미지원 | 지원 (FlowProducer) |
| Group Rate Limiting | 미지원 | 지원 |
| 유지보수 | 유지보수 모드 | 활발한 개발 |
운영 Best Practice 체크리스트
- removeOnComplete/removeOnFail 필수: 설정하지 않으면 Redis 메모리가 무한 증가합니다. 완료된 Job은 100~1000개 또는 시간 기반(age)으로 제한하세요.
- attempts + backoff 설정: 외부 API 호출 작업에는 최소 3회 재시도 + 지수 백오프를 설정하세요.
- jobId로 멱등성 보장: 동일한 jobId는 큐에 중복 추가되지 않습니다. 주문 ID 등 비즈니스 키를 jobId로 사용하면 중복 처리를 방지할 수 있습니다.
- Stalled Job 모니터링: Worker가 크래시하면 작업이 stalled 상태가 됩니다.
stalledInterval설정과 stalled 이벤트 모니터링을 추가하세요. - dedicated Redis 사용: 캐시용 Redis와 큐용 Redis를 분리하세요. 캐시 eviction으로 큐 데이터가 삭제되면 작업이 유실됩니다.
- Graceful Shutdown: 애플리케이션 종료 시 Worker가 진행 중인 Job을 완료할 수 있도록
enableShutdownHooks()를 설정하세요.
// main.ts — Graceful Shutdown
async function bootstrap() {
const app = await NestFactory.create(AppModule);
app.enableShutdownHooks(); // SIGTERM 시 Worker 정리
await app.listen(3000);
}
참고 자료