NestJS + BullMQ 큐: Producer

왜 큐가 필요한가: HTTP 요청 밖에서 일을 처리하는 이유

API 요청 안에서 이메일 발송, 이미지 리사이징, PDF 생성, 외부 API 호출 같은 무거운 작업을 동기적으로 처리하면 응답 시간이 길어지고, 실패 시 재시도가 불가능합니다. 큐(Queue)는 이런 작업을 비동기로 분리하여 API는 즉시 응답하고, 백그라운드 워커가 별도로 처리합니다.

NestJS는 @nestjs/bullmq 패키지를 통해 BullMQ(Redis 기반 큐 라이브러리)와 공식 통합을 제공합니다. 레거시 @nestjs/bull(Bull 4.x)도 있지만, NestJS 10+에서는 BullMQ 기반의 @nestjs/bullmq가 권장됩니다.

설치와 기본 설정

# BullMQ 기반 (권장)
npm install @nestjs/bullmq bullmq

# 또는 레거시 Bull 기반
# npm install @nestjs/bull bull
// app.module.ts
import { BullModule } from '@nestjs/bullmq';

@Module({
  imports: [
    BullModule.forRoot({
      connection: {
        host: 'localhost',
        port: 6379,
        // password: process.env.REDIS_PASSWORD,
      },
    }),
    // 큐 등록
    BullModule.registerQueue({
      name: 'email',        // 큐 이름
    }),
    BullModule.registerQueue({
      name: 'image-processing',
      defaultJobOptions: {
        attempts: 3,         // 기본 재시도 횟수
        backoff: {
          type: 'exponential',
          delay: 1000,       // 1초부터 지수 백오프
        },
        removeOnComplete: 100,  // 완료된 작업 100개만 유지
        removeOnFail: 500,      // 실패한 작업 500개 유지
      },
    }),
    EmailModule,
    ImageModule,
  ],
})
export class AppModule {}

Producer: 큐에 작업 추가하기

Producer는 큐에 작업(Job)을 추가하는 역할입니다. @InjectQueue() 데코레이터로 큐 인스턴스를 주입받습니다.

import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';

@Injectable()
export class EmailService {
  constructor(
    @InjectQueue('email') private readonly emailQueue: Queue,
  ) {}

  async sendWelcomeEmail(userId: number, email: string) {
    // 큐에 작업 추가 — 즉시 반환
    await this.emailQueue.add(
      'welcome',             // Job 이름 (라우팅에 사용)
      { userId, email },     // Job 데이터 (직렬화 가능해야 함)
      {
        priority: 1,         // 높은 우선순위 (낮을수록 우선)
        delay: 5000,         // 5초 후 실행
      },
    );
  }

  async sendBulkEmails(users: Array<{ id: number; email: string }>) {
    // 대량 작업 추가 — addBulk으로 한 번에 추가
    const jobs = users.map(user => ({
      name: 'newsletter',
      data: { userId: user.id, email: user.email },
      opts: { priority: 5 },
    }));

    await this.emailQueue.addBulk(jobs);
  }
}

Consumer (Processor): 큐 작업 처리하기

Consumer는 @Processor() 데코레이터가 붙은 클래스로, BullMQ의 Worker를 NestJS DI 시스템에 통합합니다.

import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';
import { Logger } from '@nestjs/common';

@Processor('email')
export class EmailProcessor extends WorkerHost {
  private readonly logger = new Logger(EmailProcessor.name);

  async process(job: Job<{ userId: number; email: string }>): Promise<any> {
    this.logger.log(`Processing job ${job.id}: ${job.name}`);

    switch (job.name) {
      case 'welcome':
        return this.sendWelcome(job.data);
      case 'newsletter':
        return this.sendNewsletter(job.data);
      default:
        throw new Error(`Unknown job name: ${job.name}`);
    }
  }

  private async sendWelcome(data: { userId: number; email: string }) {
    // 실제 이메일 발송 로직
    await this.mailerService.send({
      to: data.email,
      subject: '환영합니다!',
      template: 'welcome',
    });
    return { sent: true };
  }

  private async sendNewsletter(data: { userId: number; email: string }) {
    // 뉴스레터 발송 로직
    await this.mailerService.send({
      to: data.email,
      subject: '주간 뉴스레터',
      template: 'newsletter',
    });
    return { sent: true };
  }
}

핵심: @Processor('email')'email' 큐의 Worker를 생성합니다. process() 메서드가 각 Job을 순차적으로(기본 concurrency=1) 처리합니다.

Job Options 완전 정리: 재시도·지연·우선순위·TTL

옵션 타입 기본값 설명
attempts number 0 (재시도 없음) 최대 시도 횟수 (1=한 번만, 3=원본+재시도2)
backoff object 없음 { type: 'exponential' | 'fixed', delay: ms }
delay number 0 밀리초 후 실행 (예약 작업)
priority number 0 낮을수록 높은 우선순위
removeOnComplete boolean | number false true=즉시 삭제, number=최대 N개 유지
removeOnFail boolean | number false 실패한 Job 보존 정책
jobId string 자동 생성 커스텀 ID — 중복 방지에 활용
repeat object 없음 반복 실행 (크론 대체)
// 실무 Job Options 예시
await this.emailQueue.add('welcome', data, {
  attempts: 5,
  backoff: {
    type: 'exponential',
    delay: 2000,            // 2s → 4s → 8s → 16s → 32s
  },
  removeOnComplete: {
    age: 3600,              // 완료 후 1시간 보존
    count: 1000,            // 최대 1000개 보존
  },
  removeOnFail: {
    age: 86400 * 7,         // 실패 후 7일 보존
  },
});

Concurrency: 병렬 처리 제어

// 동시에 5개 Job을 병렬 처리
@Processor('image-processing', {
  concurrency: 5,
})
export class ImageProcessor extends WorkerHost {
  async process(job: Job) {
    // CPU/메모리 집중 작업
    return this.resizeImage(job.data.imageUrl, job.data.sizes);
  }
}
concurrency 적합한 작업 주의사항
1 (기본) 순서 보장 필요, 외부 API rate limit 처리량 제한
5~10 이메일 발송, HTTP 호출 외부 서비스 부하 고려
CPU 코어 수 이미지/비디오 처리 Node.js 이벤트 루프 블로킹 주의

이벤트 리스닝: Job 생명주기 추적

BullMQ의 Worker는 다양한 이벤트를 발생시킵니다. @OnWorkerEvent() 데코레이터로 Processor 클래스 안에서 이벤트를 처리할 수 있습니다.

import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq';
import { Job } from 'bullmq';

@Processor('email')
export class EmailProcessor extends WorkerHost {
  async process(job: Job): Promise<any> {
    // 처리 로직
  }

  @OnWorkerEvent('completed')
  onCompleted(job: Job) {
    this.logger.log(`Job ${job.id} completed. Result: ${JSON.stringify(job.returnvalue)}`);
  }

  @OnWorkerEvent('failed')
  onFailed(job: Job, error: Error) {
    this.logger.error(
      `Job ${job.id} failed after ${job.attemptsMade} attempts: ${error.message}`,
    );
    // Slack/Discord 알림, 모니터링 시스템에 보고
    this.alertService.notify(`Email job failed: ${job.id}`);
  }

  @OnWorkerEvent('progress')
  onProgress(job: Job, progress: number) {
    this.logger.log(`Job ${job.id} progress: ${progress}%`);
  }

  @OnWorkerEvent('stalled')
  onStalled(jobId: string) {
    this.logger.warn(`Job ${jobId} stalled — worker may have crashed`);
  }
}

주요 이벤트 정리

이벤트 발생 시점 활용
completed Job 성공 완료 후속 작업 트리거, 로깅
failed 모든 재시도 소진 후 최종 실패 알림, DLQ 처리
progress Job 내에서 job.updateProgress() 호출 시 진행률 표시
stalled Worker가 lock 갱신 실패 (크래시 추정) 모니터링, 재할당

Repeatable Jobs: 크론 대체 스케줄링

BullMQ의 repeat 옵션으로 주기적 작업을 스케줄링할 수 있습니다. @nestjs/schedule과 달리 Redis에 상태가 저장되므로 여러 인스턴스에서 중복 실행되지 않습니다.

// 모듈 초기화 시 Repeatable Job 등록
@Injectable()
export class ScheduleSetupService implements OnModuleInit {
  constructor(
    @InjectQueue('maintenance') private readonly maintenanceQueue: Queue,
  ) {}

  async onModuleInit() {
    // 매일 자정에 실행
    await this.maintenanceQueue.add(
      'cleanup-expired-sessions',
      {},                          // data
      {
        repeat: {
          pattern: '0 0 * * *',    // 크론 표현식
          tz: 'Asia/Seoul',        // 타임존
        },
        jobId: 'cleanup-sessions', // 고정 ID로 중복 등록 방지
      },
    );

    // 5분마다 실행
    await this.maintenanceQueue.add(
      'health-check',
      {},
      {
        repeat: {
          every: 300000,           // 300초 = 5분
        },
        jobId: 'health-check',
      },
    );
  }
}

중복 방지: jobId를 고정하면 애플리케이션이 재시작되어도 같은 ID의 repeatable job이 중복 등록되지 않습니다. BullMQ는 동일한 repeat 설정 + jobId 조합은 하나만 유지합니다.

Job Progress: 대용량 작업의 진행률 추적

@Processor('export')
export class ExportProcessor extends WorkerHost {
  async process(job: Job<{ userId: number; format: string }>) {
    const totalRecords = await this.getRecordCount(job.data.userId);
    let processed = 0;

    for await (const batch of this.getBatches(job.data.userId, 1000)) {
      await this.exportBatch(batch, job.data.format);
      processed += batch.length;

      // 진행률 업데이트 (0~100)
      await job.updateProgress(Math.round((processed / totalRecords) * 100));
    }

    return { totalExported: processed };
  }
}

// API에서 진행률 조회
@Controller('exports')
export class ExportController {
  constructor(@InjectQueue('export') private readonly exportQueue: Queue) {}

  @Get(':jobId/progress')
  async getProgress(@Param('jobId') jobId: string) {
    const job = await this.exportQueue.getJob(jobId);
    if (!job) throw new NotFoundException();

    return {
      state: await job.getState(),   // 'waiting' | 'active' | 'completed' | 'failed'
      progress: job.progress,         // 0~100
      result: job.returnvalue,
    };
  }
}

Separate Processes: CPU 집약 작업의 격리

Node.js는 단일 스레드이므로, CPU 집약적인 Job이 이벤트 루프를 블로킹하면 HTTP 요청 처리도 멈춥니다. BullMQ는 별도 프로세스(sandboxed processor)에서 Job을 실행하는 기능을 제공합니다.

// image.processor.ts — 별도 파일로 분리
// 이 파일은 NestJS DI 컨텍스트 밖에서 실행됨
import { SandboxedJob } from 'bullmq';

export default async function (job: SandboxedJob) {
  // CPU 집약적 이미지 처리
  const sharp = require('sharp');
  const result = await sharp(job.data.buffer)
    .resize(job.data.width, job.data.height)
    .toBuffer();

  return { size: result.length };
}
// 모듈에서 sandboxed processor 등록
BullModule.registerQueue({
  name: 'image-processing',
  processors: [{
    path: join(__dirname, 'image.processor.js'),
    concurrency: 4,
  }],
})

주의: Sandboxed processor는 NestJS DI 컨테이너에 접근할 수 없습니다. 서비스 주입이 필요하면 WorkerHost를 사용하고, 순수 CPU 작업만 sandboxed로 분리합니다.

Redis 연결 관리와 장애 대응

BullModule.forRoot({
  connection: {
    host: process.env.REDIS_HOST || 'localhost',
    port: parseInt(process.env.REDIS_PORT || '6379'),
    password: process.env.REDIS_PASSWORD,
    maxRetriesPerRequest: null,    // BullMQ 필수: null로 설정
    enableReadyCheck: false,       // 클러스터 환경에서 권장
  },
})

필수 설정: maxRetriesPerRequest: null은 BullMQ가 Redis 연결이 끊겼을 때 무한히 재연결을 시도하도록 합니다. 이 값이 기본값(20)이면 연결 실패 시 Worker가 중지됩니다. BullMQ 공식 문서에서 이 설정을 필수로 요구합니다.

Bull Board: 웹 UI 대시보드

npm install @bull-board/api @bull-board/express @bull-board/nestjs
import { BullBoardModule } from '@bull-board/nestjs';
import { ExpressAdapter } from '@bull-board/express';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';

@Module({
  imports: [
    BullBoardModule.forRoot({
      route: '/queues',          // 대시보드 경로
      adapter: ExpressAdapter,
    }),
    BullBoardModule.forFeature({
      name: 'email',
      adapter: BullMQAdapter,
    }),
    BullBoardModule.forFeature({
      name: 'image-processing',
      adapter: BullMQAdapter,
    }),
  ],
})
export class AppModule {}

// http://localhost:3000/queues 에서 대시보드 접근

Bull Board에서 확인 가능한 정보:

  • 큐별 Waiting / Active / Completed / Failed / Delayed 작업 수
  • 개별 Job의 데이터, 로그, 에러 스택트레이스
  • 실패한 Job 수동 재시도
  • Repeatable Job 관리

@nestjs/bull vs @nestjs/bullmq 비교

항목 @nestjs/bull (레거시) @nestjs/bullmq (권장)
기반 라이브러리 Bull 4.x BullMQ 4.x+
Processor 방식 @Process() 데코레이터 WorkerHost 클래스 상속
이벤트 리스닝 @OnQueueActive() @OnWorkerEvent()
Flow (의존성 체인) 미지원 지원 (FlowProducer)
Group Rate Limiting 미지원 지원
유지보수 유지보수 모드 활발한 개발

운영 Best Practice 체크리스트

  • removeOnComplete/removeOnFail 필수: 설정하지 않으면 Redis 메모리가 무한 증가합니다. 완료된 Job은 100~1000개 또는 시간 기반(age)으로 제한하세요.
  • attempts + backoff 설정: 외부 API 호출 작업에는 최소 3회 재시도 + 지수 백오프를 설정하세요.
  • jobId로 멱등성 보장: 동일한 jobId는 큐에 중복 추가되지 않습니다. 주문 ID 등 비즈니스 키를 jobId로 사용하면 중복 처리를 방지할 수 있습니다.
  • Stalled Job 모니터링: Worker가 크래시하면 작업이 stalled 상태가 됩니다. stalledInterval 설정과 stalled 이벤트 모니터링을 추가하세요.
  • dedicated Redis 사용: 캐시용 Redis와 큐용 Redis를 분리하세요. 캐시 eviction으로 큐 데이터가 삭제되면 작업이 유실됩니다.
  • Graceful Shutdown: 애플리케이션 종료 시 Worker가 진행 중인 Job을 완료할 수 있도록 enableShutdownHooks()를 설정하세요.
// main.ts — Graceful Shutdown
async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  app.enableShutdownHooks();   // SIGTERM 시 Worker 정리
  await app.listen(3000);
}

참고 자료

📥 관련 무료 이북

NestJS + TypeORM 실전 가이드 — 실전 가이드 무료 제공

무료로 받기 →

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux