BullMQ란?
BullMQ는 Redis 기반의 Node.js 작업 큐 라이브러리입니다. NestJS의 @nestjs/bullmq 패키지를 통해 데코레이터 기반으로 큐 프로듀서/컨슈머를 선언적으로 구성할 수 있습니다. 이메일 발송, 이미지 처리, 결제 후처리 등 비동기 백그라운드 작업에 필수적인 인프라입니다.
Bull vs BullMQ
| 항목 | Bull (레거시) | BullMQ (현재) |
|---|---|---|
| Redis 명령 | 개별 명령 | Lua 스크립트 (원자적) |
| Flow (부모-자식) | 미지원 | 네이티브 지원 |
| Worker 스레드 | 프로세스 기반 | Worker Threads + Sandboxed |
| Rate Limiting | 기본 | 그룹별 세밀한 제어 |
| NestJS 패키지 | @nestjs/bull | @nestjs/bullmq |
설치 및 모듈 설정
npm install @nestjs/bullmq bullmq
// app.module.ts
import { BullModule } from '@nestjs/bullmq';
@Module({
imports: [
BullModule.forRoot({
connection: {
host: 'localhost',
port: 6379,
password: process.env.REDIS_PASSWORD,
maxRetriesPerRequest: null, // BullMQ 필수 설정
},
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: { count: 1000 },
removeOnFail: { count: 5000 },
},
}),
BullModule.registerQueue(
{ name: 'email' },
{ name: 'image-processing' },
{ name: 'payment-webhook' },
),
],
})
export class AppModule {}
프로듀서 — 작업 등록
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
@Injectable()
export class EmailService {
constructor(@InjectQueue('email') private emailQueue: Queue) {}
async sendWelcomeEmail(userId: string, email: string) {
await this.emailQueue.add(
'welcome', // job name
{ userId, email, template: 'welcome' },
{
priority: 1, // 낮을수록 높은 우선순위
delay: 5000, // 5초 후 실행
attempts: 5,
backoff: { type: 'exponential', delay: 2000 },
},
);
}
async sendBulkEmails(recipients: EmailRecipient[]) {
const jobs = recipients.map((r) => ({
name: 'bulk-send',
data: { email: r.email, template: r.template },
opts: {
priority: 10,
// 중복 방지: 같은 이메일에 같은 템플릿은 1시간 내 재전송 금지
jobId: `bulk-${r.email}-${r.template}`,
},
}));
await this.emailQueue.addBulk(jobs);
}
}
컨슈머 — Processor와 Worker
import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq';
import { Job } from 'bullmq';
@Processor('email', {
concurrency: 5, // 동시 처리 수
limiter: { max: 100, duration: 60000 }, // 분당 100건 제한
})
export class EmailProcessor extends WorkerHost {
constructor(
private readonly mailer: MailerService,
private readonly userRepo: UserRepository,
) {
super();
}
async process(job: Job): Promise<any> {
switch (job.name) {
case 'welcome':
return this.handleWelcome(job);
case 'bulk-send':
return this.handleBulkSend(job);
default:
throw new Error(`Unknown job: ${job.name}`);
}
}
private async handleWelcome(job: Job) {
const { userId, email, template } = job.data;
// 진행률 업데이트
await job.updateProgress(10);
const user = await this.userRepo.findById(userId);
if (!user) throw new Error(`User ${userId} not found`);
await job.updateProgress(50);
await this.mailer.send({
to: email,
template,
context: { name: user.name },
});
await job.updateProgress(100);
return { sent: true, email };
}
// 이벤트 훅
@OnWorkerEvent('completed')
onCompleted(job: Job) {
console.log(`Job ${job.id} completed: ${job.returnvalue}`);
}
@OnWorkerEvent('failed')
onFailed(job: Job, error: Error) {
console.error(`Job ${job.id} failed: ${error.message}`);
// Sentry/Slack 알림 등
}
}
Flow — 부모-자식 작업 체인
BullMQ의 가장 강력한 기능 중 하나는 Flow(작업 의존성 체인)입니다. 부모 작업이 모든 자식 작업 완료를 기다린 후 실행됩니다.
import { InjectFlowProducer } from '@nestjs/bullmq';
import { FlowProducer } from 'bullmq';
@Injectable()
export class ImagePipelineService {
constructor(
@InjectFlowProducer('image-pipeline')
private flowProducer: FlowProducer,
) {}
async processUploadedImage(imageId: string, url: string) {
await this.flowProducer.add({
name: 'finalize',
queueName: 'image-processing',
data: { imageId },
children: [
{
name: 'generate-thumbnail',
queueName: 'image-processing',
data: { imageId, url, size: 'thumbnail' },
},
{
name: 'generate-webp',
queueName: 'image-processing',
data: { imageId, url, format: 'webp' },
},
{
name: 'extract-metadata',
queueName: 'image-processing',
data: { imageId, url },
},
],
});
// thumbnail + webp + metadata 모두 완료 → finalize 실행
}
}
모듈 등록 시 FlowProducer도 추가:
BullModule.registerFlowProducer({ name: 'image-pipeline' })
반복 작업 (Repeatable Jobs)
크론잡처럼 주기적으로 실행되는 작업도 BullMQ로 관리할 수 있습니다.
@Injectable()
export class SchedulerService implements OnModuleInit {
constructor(@InjectQueue('payment-webhook') private queue: Queue) {}
async onModuleInit() {
// 기존 반복 작업 정리 후 재등록
const repeatableJobs = await this.queue.getRepeatableJobs();
for (const job of repeatableJobs) {
await this.queue.removeRepeatableByKey(job.key);
}
// 매 5분마다 실패한 웹훅 재시도
await this.queue.add(
'retry-failed-webhooks',
{},
{ repeat: { pattern: '*/5 * * * *' } },
);
// 매일 자정 — 30일 이상 된 완료 작업 정리
await this.queue.add(
'cleanup-old-jobs',
{},
{ repeat: { pattern: '0 0 * * *' } },
);
}
}
에러 핸들링과 Dead Letter Queue
// 재시도 모두 실패 시 DLQ로 이동
@Processor('payment-webhook', { concurrency: 10 })
export class WebhookProcessor extends WorkerHost {
constructor(
@InjectQueue('dead-letter') private dlq: Queue,
) {
super();
}
async process(job: Job) {
try {
const response = await this.httpService.post(
job.data.webhookUrl,
job.data.payload,
{ timeout: 5000 },
);
if (response.status >= 400) {
throw new Error(`Webhook returned ${response.status}`);
}
return { status: response.status };
} catch (error) {
// 마지막 시도에서도 실패하면 DLQ로
if (job.attemptsMade >= (job.opts.attempts ?? 3) - 1) {
await this.dlq.add('webhook-failed', {
originalQueue: 'payment-webhook',
originalJobId: job.id,
data: job.data,
error: error.message,
failedAt: new Date().toISOString(),
});
}
throw error; // 재시도 트리거
}
}
}
모니터링 — Bull Board
프로덕션에서는 미들웨어로 Bull Board 대시보드를 통합합니다.
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
// main.ts
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');
createBullBoard({
queues: [
new BullMQAdapter(app.get(getQueueToken('email'))),
new BullMQAdapter(app.get(getQueueToken('image-processing'))),
new BullMQAdapter(app.get(getQueueToken('payment-webhook'))),
],
serverAdapter,
});
app.use('/admin/queues', authMiddleware, serverAdapter.getRouter());
운영 팁
- Redis 분리: 캐시용 Redis와 큐용 Redis를 분리해야
FLUSHALL사고 방지 - Graceful Shutdown:
enableShutdownHooks()로 진행 중인 작업 완료 후 종료 - Stalled Jobs: Worker가 죽으면 BullMQ가 자동으로 stalled 감지 → 재시도.
stalledInterval과maxStalledCount조정 - Sandboxed Processor: CPU 집약 작업은 별도 Worker Thread에서 실행해 이벤트 루프 차단 방지
- 테스트: Vitest에서
ioredis-mock으로 Redis 없이 큐 로직 단위 테스트 가능
정리
NestJS + BullMQ는 백그라운드 작업 처리의 표준 조합입니다. 단순 비동기 작업부터 Flow 기반 파이프라인, 반복 스케줄링, Dead Letter Queue까지 — 프로덕션 수준의 작업 큐 시스템을 데코레이터 기반으로 간결하게 구축할 수 있습니다. Redis의 신뢰성과 BullMQ의 원자적 Lua 스크립트가 결합되어, 작업 유실 없는 안정적인 비동기 처리를 보장합니다.