NestJS 스트리밍 업로드란?
기본 Multer 파일 업로드는 전체 파일을 메모리나 디스크에 버퍼링한 뒤 처리합니다. 수백 MB~GB 파일에서는 메모리 폭발이나 디스크 I/O 병목이 발생합니다. 스트리밍 업로드는 Readable Stream을 직접 소비하여 메모리 사용량을 일정하게 유지하면서 S3나 스토리지로 파이프합니다. NestJS에서 Multer의 한계를 넘어 busboy 기반 스트리밍, 청크 업로드, 프로그레스 추적까지 구현하는 방법을 다룹니다.
1. Multer 메모리 문제
| 방식 | 메모리 사용 | 최대 파일 크기 | 적합한 상황 |
|---|---|---|---|
| Multer memoryStorage | 파일 크기 × 동시 요청 | ~50MB | 썸네일, 프로필 이미지 |
| Multer diskStorage | 낮음 (디스크 I/O) | ~500MB | 중간 크기 파일 |
| busboy 스트리밍 | ~64KB (버퍼 크기) | 무제한 | 대용량 비디오, 백업 |
| Presigned URL (클라이언트 직접) | 0 (서버 경유 안 함) | 5TB (S3) | 최대 크기, 서버 부하 제거 |
2. busboy 스트리밍 업로드 구현
Multer를 우회하고 busboy로 직접 multipart 스트림을 파싱합니다.
npm install busboy @aws-sdk/client-s3 @aws-sdk/lib-storage
npm install -D @types/busboy
// upload/streaming-upload.service.ts
import { Injectable, BadRequestException } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
import * as busboy from 'busboy';
import { Request } from 'express';
import { PassThrough } from 'stream';
import { randomUUID } from 'crypto';
interface UploadResult {
key: string;
bucket: string;
size: number;
contentType: string;
}
@Injectable()
export class StreamingUploadService {
private s3: S3Client;
private bucket: string;
constructor(private config: ConfigService) {
this.s3 = new S3Client({
region: config.get('AWS_REGION'),
credentials: {
accessKeyId: config.get('AWS_ACCESS_KEY'),
secretAccessKey: config.get('AWS_SECRET_KEY'),
},
});
this.bucket = config.get('S3_BUCKET');
}
async streamToS3(
req: Request,
options?: { maxSize?: number; allowedTypes?: string[] },
): Promise<UploadResult> {
const maxSize = options?.maxSize ?? 1024 * 1024 * 500; // 500MB
const allowedTypes = options?.allowedTypes ?? [
'video/mp4', 'video/webm', 'application/zip', 'application/pdf',
];
return new Promise((resolve, reject) => {
const bb = busboy({
headers: req.headers,
limits: {
fileSize: maxSize,
files: 1, // 단일 파일만
},
});
let uploadResult: UploadResult | null = null;
bb.on('file', async (fieldname, stream, info) => {
const { filename, mimeType } = info;
// MIME 타입 검증
if (!allowedTypes.includes(mimeType)) {
stream.resume(); // 스트림 소비하여 정리
reject(new BadRequestException(
`Unsupported type: ${mimeType}`
));
return;
}
const key = `uploads/${randomUUID()}/${filename}`;
let totalBytes = 0;
// 크기 추적용 PassThrough
const tracker = new PassThrough();
tracker.on('data', (chunk) => {
totalBytes += chunk.length;
});
// 파일 크기 초과 감지
stream.on('limit', () => {
stream.destroy();
reject(new BadRequestException(
`File exceeds ${maxSize / 1024 / 1024}MB limit`
));
});
try {
// S3 멀티파트 업로드 (자동 청크 분할)
const upload = new Upload({
client: this.s3,
params: {
Bucket: this.bucket,
Key: key,
Body: stream.pipe(tracker),
ContentType: mimeType,
},
queueSize: 4, // 동시 파트 업로드 수
partSize: 10 * 1024 * 1024, // 10MB per part
});
await upload.done();
uploadResult = {
key,
bucket: this.bucket,
size: totalBytes,
contentType: mimeType,
};
} catch (err) {
reject(err);
}
});
bb.on('close', () => {
if (uploadResult) resolve(uploadResult);
});
bb.on('error', reject);
req.pipe(bb);
});
}
}
3. Controller: Raw Body 접근
// upload/upload.controller.ts
@Controller('upload')
export class UploadController {
constructor(private streamingUpload: StreamingUploadService) {}
// Multer 인터셉터 없이 Raw Request 접근
@Post('stream')
@UseGuards(JwtAuthGuard)
@Header('Connection', 'keep-alive')
async streamUpload(
@Req() req: Request,
@CurrentUser() user: User,
): Promise<UploadResult> {
// Content-Type 검증
if (!req.headers['content-type']?.includes('multipart/form-data')) {
throw new BadRequestException('multipart/form-data required');
}
const result = await this.streamingUpload.streamToS3(req, {
maxSize: 1024 * 1024 * 200, // 200MB
allowedTypes: ['video/mp4', 'video/webm', 'image/png', 'image/jpeg'],
});
// DB에 메타데이터 저장
await this.fileService.saveMetadata({
...result,
uploadedBy: user.id,
});
return result;
}
}
주의: NestJS의 기본 body parser가 multipart를 먼저 소비하면 스트리밍이 불가합니다. main.ts에서 특정 라우트를 body parsing에서 제외해야 합니다.
// main.ts
const app = await NestFactory.create(AppModule, {
bodyParser: false, // 전역 비활성화
});
// 스트리밍 라우트 외에는 다시 활성화
import * as bodyParser from 'body-parser';
app.use('/upload/stream', (req, res, next) => next()); // skip
app.use(bodyParser.json({ limit: '10mb' }));
app.use(bodyParser.urlencoded({ extended: true }));
4. 청크 업로드: 클라이언트 제어
네트워크 불안정 환경에서는 클라이언트가 파일을 청크로 나눠 전송하고, 실패한 청크만 재전송하는 패턴이 필요합니다.
// upload/chunked-upload.service.ts
@Injectable()
export class ChunkedUploadService {
constructor(
@InjectRedis() private redis: Redis,
private s3Service: S3Service,
) {}
// 1단계: 업로드 세션 초기화
async initUpload(dto: InitUploadDto): Promise<UploadSession> {
const sessionId = randomUUID();
const { createMultipartUpload } = await this.s3Service
.initiateMultipart(dto.filename, dto.contentType);
const session: UploadSession = {
id: sessionId,
s3UploadId: createMultipartUpload.UploadId,
key: `uploads/${sessionId}/${dto.filename}`,
totalChunks: dto.totalChunks,
uploadedParts: [],
createdAt: new Date(),
};
await this.redis.set(
`upload:${sessionId}`,
JSON.stringify(session),
'EX', 3600, // 1시간 만료
);
return session;
}
// 2단계: 개별 청크 업로드
async uploadChunk(
sessionId: string,
partNumber: number,
body: Buffer,
): Promise<{ etag: string }> {
const session = await this.getSession(sessionId);
const { ETag } = await this.s3Service.uploadPart({
key: session.key,
uploadId: session.s3UploadId,
partNumber,
body,
});
// 파트 정보 저장
session.uploadedParts.push({ partNumber, etag: ETag });
await this.redis.set(
`upload:${sessionId}`,
JSON.stringify(session),
'EX', 3600,
);
return { etag: ETag };
}
// 3단계: 업로드 완료
async completeUpload(sessionId: string): Promise<UploadResult> {
const session = await this.getSession(sessionId);
if (session.uploadedParts.length !== session.totalChunks) {
throw new BadRequestException(
`Missing chunks: ${session.uploadedParts.length}/${session.totalChunks}`
);
}
const result = await this.s3Service.completeMultipart({
key: session.key,
uploadId: session.s3UploadId,
parts: session.uploadedParts
.sort((a, b) => a.partNumber - b.partNumber),
});
await this.redis.del(`upload:${sessionId}`);
return result;
}
// 업로드 진행 상황 조회
async getProgress(sessionId: string): Promise<UploadProgress> {
const session = await this.getSession(sessionId);
return {
sessionId,
totalChunks: session.totalChunks,
uploadedChunks: session.uploadedParts.length,
percent: Math.round(
(session.uploadedParts.length / session.totalChunks) * 100
),
missingParts: this.findMissingParts(session),
};
}
private findMissingParts(session: UploadSession): number[] {
const uploaded = new Set(session.uploadedParts.map(p => p.partNumber));
const missing: number[] = [];
for (let i = 1; i <= session.totalChunks; i++) {
if (!uploaded.has(i)) missing.push(i);
}
return missing;
}
}
5. SSE 프로그레스 추적
// upload/upload.controller.ts
@Sse('progress/:sessionId')
uploadProgress(
@Param('sessionId') sessionId: string,
): Observable<MessageEvent> {
return new Observable((subscriber) => {
const interval = setInterval(async () => {
try {
const progress = await this.chunkedUpload.getProgress(sessionId);
subscriber.next({
data: progress,
type: 'progress',
} as MessageEvent);
if (progress.percent === 100) {
subscriber.next({
data: { status: 'complete' },
type: 'complete',
} as MessageEvent);
subscriber.complete();
clearInterval(interval);
}
} catch {
clearInterval(interval);
subscriber.complete();
}
}, 1000);
return () => clearInterval(interval);
});
}
6. 바이러스 스캔 파이프라인
// upload/virus-scan.service.ts
@Injectable()
export class VirusScanService {
// ClamAV 소켓 연결
async scanStream(stream: Readable): Promise<ScanResult> {
return new Promise((resolve, reject) => {
const clamav = net.createConnection(
{ host: 'clamav', port: 3310 },
() => {
clamav.write('zINSTREAM ');
}
);
stream.on('data', (chunk: Buffer) => {
// ClamAV 프로토콜: 4바이트 크기 + 데이터
const size = Buffer.alloc(4);
size.writeUInt32BE(chunk.length, 0);
clamav.write(size);
clamav.write(chunk);
});
stream.on('end', () => {
// 종료 시그널: 크기 0
const end = Buffer.alloc(4);
end.writeUInt32BE(0, 0);
clamav.write(end);
});
let response = '';
clamav.on('data', (data) => { response += data.toString(); });
clamav.on('end', () => {
const clean = response.includes('OK');
resolve({
clean,
message: response.trim(),
});
});
clamav.on('error', reject);
});
}
}
// 스트리밍 파이프라인에 통합
async streamWithScan(req: Request): Promise<UploadResult> {
// Tee 스트림: 동시에 스캔 + S3 업로드
const [scanStream, uploadStream] = teeStream(fileStream);
const [scanResult, uploadResult] = await Promise.all([
this.virusScan.scanStream(scanStream),
this.uploadToS3(uploadStream),
]);
if (!scanResult.clean) {
await this.s3Service.deleteObject(uploadResult.key);
throw new BadRequestException('Virus detected: ' + scanResult.message);
}
return uploadResult;
}
7. 성능 체크리스트
| 항목 | 권장 | 이유 |
|---|---|---|
| highWaterMark | 64KB~256KB | 메모리 vs 처리량 균형 |
| S3 partSize | 10~25MB | 파트 수 제한 (최대 10,000) |
| 동시 업로드 제한 | Pod당 10~20 | 메모리·네트워크 보호 |
| 타임아웃 | 요청 크기 비례 | 대용량에 고정 타임아웃 부적합 |
| Nginx client_max_body_size | 앱 제한과 일치 | Nginx에서 먼저 413 발생 방지 |
마무리
NestJS에서 대용량 파일 업로드는 Multer의 버퍼링 한계를 넘어 busboy 스트리밍, S3 멀티파트, 청크 재전송으로 해결합니다. @aws-sdk/lib-storage의 Upload 클래스가 자동 멀티파트 분할을 처리하고, Redis 기반 세션으로 청크 업로드 상태를 관리합니다. NestJS 파일 업로드 기초를 이해한 뒤, NestJS SSE 스트리밍으로 실시간 프로그레스 추적을 적용하는 것을 권장합니다.