NestJS BullMQ 작업 큐 심화

BullMQ란?

BullMQ는 Redis 기반의 Node.js 작업 큐 라이브러리입니다. NestJS의 @nestjs/bullmq 패키지를 통해 데코레이터 기반으로 큐 프로듀서/컨슈머를 선언적으로 구성할 수 있습니다. 이메일 발송, 이미지 처리, 결제 후처리 등 비동기 백그라운드 작업에 필수적인 인프라입니다.

Bull vs BullMQ

항목 Bull (레거시) BullMQ (현재)
Redis 명령 개별 명령 Lua 스크립트 (원자적)
Flow (부모-자식) 미지원 네이티브 지원
Worker 스레드 프로세스 기반 Worker Threads + Sandboxed
Rate Limiting 기본 그룹별 세밀한 제어
NestJS 패키지 @nestjs/bull @nestjs/bullmq

설치 및 모듈 설정

npm install @nestjs/bullmq bullmq
// app.module.ts
import { BullModule } from '@nestjs/bullmq';

@Module({
  imports: [
    BullModule.forRoot({
      connection: {
        host: 'localhost',
        port: 6379,
        password: process.env.REDIS_PASSWORD,
        maxRetriesPerRequest: null, // BullMQ 필수 설정
      },
      defaultJobOptions: {
        attempts: 3,
        backoff: { type: 'exponential', delay: 1000 },
        removeOnComplete: { count: 1000 },
        removeOnFail: { count: 5000 },
      },
    }),
    BullModule.registerQueue(
      { name: 'email' },
      { name: 'image-processing' },
      { name: 'payment-webhook' },
    ),
  ],
})
export class AppModule {}

프로듀서 — 작업 등록

import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';

@Injectable()
export class EmailService {
  constructor(@InjectQueue('email') private emailQueue: Queue) {}

  async sendWelcomeEmail(userId: string, email: string) {
    await this.emailQueue.add(
      'welcome',  // job name
      { userId, email, template: 'welcome' },
      {
        priority: 1,           // 낮을수록 높은 우선순위
        delay: 5000,           // 5초 후 실행
        attempts: 5,
        backoff: { type: 'exponential', delay: 2000 },
      },
    );
  }

  async sendBulkEmails(recipients: EmailRecipient[]) {
    const jobs = recipients.map((r) => ({
      name: 'bulk-send',
      data: { email: r.email, template: r.template },
      opts: { 
        priority: 10,
        // 중복 방지: 같은 이메일에 같은 템플릿은 1시간 내 재전송 금지
        jobId: `bulk-${r.email}-${r.template}`,
      },
    }));
    
    await this.emailQueue.addBulk(jobs);
  }
}

컨슈머 — Processor와 Worker

import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq';
import { Job } from 'bullmq';

@Processor('email', {
  concurrency: 5,            // 동시 처리 수
  limiter: { max: 100, duration: 60000 },  // 분당 100건 제한
})
export class EmailProcessor extends WorkerHost {
  constructor(
    private readonly mailer: MailerService,
    private readonly userRepo: UserRepository,
  ) {
    super();
  }

  async process(job: Job): Promise<any> {
    switch (job.name) {
      case 'welcome':
        return this.handleWelcome(job);
      case 'bulk-send':
        return this.handleBulkSend(job);
      default:
        throw new Error(`Unknown job: ${job.name}`);
    }
  }

  private async handleWelcome(job: Job) {
    const { userId, email, template } = job.data;
    
    // 진행률 업데이트
    await job.updateProgress(10);
    
    const user = await this.userRepo.findById(userId);
    if (!user) throw new Error(`User ${userId} not found`);
    
    await job.updateProgress(50);
    
    await this.mailer.send({
      to: email,
      template,
      context: { name: user.name },
    });
    
    await job.updateProgress(100);
    return { sent: true, email };
  }

  // 이벤트 훅
  @OnWorkerEvent('completed')
  onCompleted(job: Job) {
    console.log(`Job ${job.id} completed: ${job.returnvalue}`);
  }

  @OnWorkerEvent('failed')
  onFailed(job: Job, error: Error) {
    console.error(`Job ${job.id} failed: ${error.message}`);
    // Sentry/Slack 알림 등
  }
}

Flow — 부모-자식 작업 체인

BullMQ의 가장 강력한 기능 중 하나는 Flow(작업 의존성 체인)입니다. 부모 작업이 모든 자식 작업 완료를 기다린 후 실행됩니다.

import { InjectFlowProducer } from '@nestjs/bullmq';
import { FlowProducer } from 'bullmq';

@Injectable()
export class ImagePipelineService {
  constructor(
    @InjectFlowProducer('image-pipeline') 
    private flowProducer: FlowProducer,
  ) {}

  async processUploadedImage(imageId: string, url: string) {
    await this.flowProducer.add({
      name: 'finalize',
      queueName: 'image-processing',
      data: { imageId },
      children: [
        {
          name: 'generate-thumbnail',
          queueName: 'image-processing',
          data: { imageId, url, size: 'thumbnail' },
        },
        {
          name: 'generate-webp',
          queueName: 'image-processing',
          data: { imageId, url, format: 'webp' },
        },
        {
          name: 'extract-metadata',
          queueName: 'image-processing',
          data: { imageId, url },
        },
      ],
    });
    // thumbnail + webp + metadata 모두 완료 → finalize 실행
  }
}

모듈 등록 시 FlowProducer도 추가:

BullModule.registerFlowProducer({ name: 'image-pipeline' })

반복 작업 (Repeatable Jobs)

크론잡처럼 주기적으로 실행되는 작업도 BullMQ로 관리할 수 있습니다.

@Injectable()
export class SchedulerService implements OnModuleInit {
  constructor(@InjectQueue('payment-webhook') private queue: Queue) {}

  async onModuleInit() {
    // 기존 반복 작업 정리 후 재등록
    const repeatableJobs = await this.queue.getRepeatableJobs();
    for (const job of repeatableJobs) {
      await this.queue.removeRepeatableByKey(job.key);
    }

    // 매 5분마다 실패한 웹훅 재시도
    await this.queue.add(
      'retry-failed-webhooks',
      {},
      { repeat: { pattern: '*/5 * * * *' } },
    );

    // 매일 자정 — 30일 이상 된 완료 작업 정리
    await this.queue.add(
      'cleanup-old-jobs',
      {},
      { repeat: { pattern: '0 0 * * *' } },
    );
  }
}

에러 핸들링과 Dead Letter Queue

// 재시도 모두 실패 시 DLQ로 이동
@Processor('payment-webhook', { concurrency: 10 })
export class WebhookProcessor extends WorkerHost {
  constructor(
    @InjectQueue('dead-letter') private dlq: Queue,
  ) {
    super();
  }

  async process(job: Job) {
    try {
      const response = await this.httpService.post(
        job.data.webhookUrl,
        job.data.payload,
        { timeout: 5000 },
      );
      if (response.status >= 400) {
        throw new Error(`Webhook returned ${response.status}`);
      }
      return { status: response.status };
    } catch (error) {
      // 마지막 시도에서도 실패하면 DLQ로
      if (job.attemptsMade >= (job.opts.attempts ?? 3) - 1) {
        await this.dlq.add('webhook-failed', {
          originalQueue: 'payment-webhook',
          originalJobId: job.id,
          data: job.data,
          error: error.message,
          failedAt: new Date().toISOString(),
        });
      }
      throw error; // 재시도 트리거
    }
  }
}

모니터링 — Bull Board

프로덕션에서는 미들웨어로 Bull Board 대시보드를 통합합니다.

import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';

// main.ts
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');

createBullBoard({
  queues: [
    new BullMQAdapter(app.get(getQueueToken('email'))),
    new BullMQAdapter(app.get(getQueueToken('image-processing'))),
    new BullMQAdapter(app.get(getQueueToken('payment-webhook'))),
  ],
  serverAdapter,
});

app.use('/admin/queues', authMiddleware, serverAdapter.getRouter());

운영 팁

  • Redis 분리: 캐시용 Redis와 큐용 Redis를 분리해야 FLUSHALL 사고 방지
  • Graceful Shutdown: enableShutdownHooks()로 진행 중인 작업 완료 후 종료
  • Stalled Jobs: Worker가 죽으면 BullMQ가 자동으로 stalled 감지 → 재시도. stalledIntervalmaxStalledCount 조정
  • Sandboxed Processor: CPU 집약 작업은 별도 Worker Thread에서 실행해 이벤트 루프 차단 방지
  • 테스트: Vitest에서 ioredis-mock으로 Redis 없이 큐 로직 단위 테스트 가능

정리

NestJS + BullMQ는 백그라운드 작업 처리의 표준 조합입니다. 단순 비동기 작업부터 Flow 기반 파이프라인, 반복 스케줄링, Dead Letter Queue까지 — 프로덕션 수준의 작업 큐 시스템을 데코레이터 기반으로 간결하게 구축할 수 있습니다. Redis의 신뢰성과 BullMQ의 원자적 Lua 스크립트가 결합되어, 작업 유실 없는 안정적인 비동기 처리를 보장합니다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux