Node.js Streams란?
Node.js Streams는 대용량 데이터를 청크(chunk) 단위로 처리하는 추상화입니다. 파일 전체를 메모리에 올리지 않고 조각씩 읽어 처리하므로, 수 GB 파일도 수십 MB 메모리로 처리할 수 있습니다. HTTP 요청/응답, 파일 I/O, 데이터 변환, 압축 등 Node.js의 핵심 API가 모두 Stream 기반입니다.
이 글에서는 4가지 Stream 유형, 백프레셔(Backpressure) 메커니즘, pipeline API, Transform Stream 패턴, 그리고 실전 대용량 처리 예제까지 심층적으로 다룹니다. NestJS StreamableFile 스트리밍 가이드와 함께 읽으면 NestJS에서의 실전 활용까지 이해할 수 있습니다.
4가지 Stream 유형
| 유형 | 역할 | 대표 예시 | 핵심 메서드 |
|---|---|---|---|
| Readable | 데이터 소스 (읽기) | fs.createReadStream, http.IncomingMessage | read(), push(), _read() |
| Writable | 데이터 목적지 (쓰기) | fs.createWriteStream, http.ServerResponse | write(), end(), _write() |
| Transform | 데이터 변환 (읽기+쓰기) | zlib.createGzip, crypto.createCipher | _transform(), _flush() |
| Duplex | 독립적 읽기+쓰기 | net.Socket, tls.TLSSocket | _read(), _write() |
백프레셔(Backpressure) 원리
백프레셔는 Stream의 가장 중요한 개념입니다. 생산 속도 > 소비 속도일 때, 생산자를 자동으로 일시 중지하여 메모리 폭발을 방지합니다.
// ❌ 안티패턴: 백프레셔 무시 → 메모리 폭발
const readable = fs.createReadStream('huge-file.csv'); // 읽기: 빠름
const writable = fs.createWriteStream('output.csv'); // 쓰기: 느림 (디스크)
readable.on('data', (chunk) => {
// write()의 반환값을 무시하면 내부 버퍼가 무한 증가!
writable.write(processChunk(chunk));
// → 10GB 파일 처리 시 메모리 수 GB 사용
});
// ✅ 올바른 패턴: write() 반환값으로 백프레셔 처리
readable.on('data', (chunk) => {
const canContinue = writable.write(processChunk(chunk));
if (!canContinue) {
// 내부 버퍼가 highWaterMark에 도달 → 읽기 일시 중지
readable.pause();
// writable 버퍼가 비워지면 다시 읽기 시작
writable.once('drain', () => {
readable.resume();
});
}
});
// ✅ 가장 좋은 패턴: pipe() 또는 pipeline() 사용
// 백프레셔를 자동으로 처리함
readable.pipe(writable);
highWaterMark 이해
// highWaterMark = 내부 버퍼 임계값 (바이트)
// 기본값: Readable 16KB, Writable 16KB
const readable = fs.createReadStream('file.dat', {
highWaterMark: 64 * 1024, // 64KB로 증가
// 큰 값: 적은 I/O 호출, 더 많은 메모리
// 작은 값: 많은 I/O 호출, 적은 메모리
});
// objectMode에서는 바이트가 아니라 객체 수
const transform = new Transform({
objectMode: true,
highWaterMark: 16, // 16개 객체까지 버퍼링
});
// 버퍼 상태 확인
console.log(readable.readableLength); // 현재 버퍼 크기
console.log(readable.readableHighWaterMark); // 임계값
console.log(writable.writableLength); // 현재 버퍼 크기
console.log(writable.writableHighWaterMark); // 임계값
pipeline() API (권장)
stream.pipeline()은 Node.js 10+에서 도입된 안전한 Stream 연결 API입니다. pipe()의 에러 처리 문제를 해결합니다.
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
import { Transform } from 'node:stream';
// pipe()의 문제점
readable
.pipe(transform) // transform에서 에러 → readable이 정리 안됨!
.pipe(writable); // 에러 전파 안됨, 메모리 누수
// ✅ pipeline(): 에러 시 모든 스트림 자동 정리 (destroy)
await pipeline(
createReadStream('input.csv'),
new Transform({
transform(chunk, encoding, callback) {
// CSV → JSON 변환
const lines = chunk.toString().split('n');
const json = lines.map(line => {
const [name, age, email] = line.split(',');
return JSON.stringify({ name, age: Number(age), email });
}).join('n');
callback(null, json);
}
}),
createGzip(),
createWriteStream('output.json.gz')
);
console.log('파이프라인 완료');
// AbortController로 파이프라인 취소
const ac = new AbortController();
setTimeout(() => ac.abort(), 5000); // 5초 후 취소
try {
await pipeline(readable, transform, writable, { signal: ac.signal });
} catch (err) {
if (err.code === 'ABORT_ERR') {
console.log('파이프라인 취소됨');
}
}
Custom Transform Stream 패턴
import { Transform } from 'node:stream';
// 패턴 1: 줄 단위 파싱 (바이트 스트림 → 줄 스트림)
class LineParser extends Transform {
private buffer = '';
constructor() {
super({ objectMode: true }); // 출력: 객체(문자열) 단위
}
_transform(chunk: Buffer, encoding: string, callback: Function) {
this.buffer += chunk.toString();
const lines = this.buffer.split('n');
// 마지막 요소는 불완전한 줄일 수 있으므로 버퍼에 유지
this.buffer = lines.pop() || '';
for (const line of lines) {
if (line.trim()) {
this.push(line); // 완성된 줄을 downstream으로 전달
}
}
callback();
}
_flush(callback: Function) {
// 스트림 종료 시 남은 버퍼 처리
if (this.buffer.trim()) {
this.push(this.buffer);
}
callback();
}
}
// 패턴 2: 배치 처리 (N개씩 묶어서 전달)
class BatchTransform extends Transform {
private batch: any[] = [];
constructor(private batchSize: number) {
super({ objectMode: true });
}
_transform(item: any, encoding: string, callback: Function) {
this.batch.push(item);
if (this.batch.length >= this.batchSize) {
this.push(this.batch);
this.batch = [];
}
callback();
}
_flush(callback: Function) {
if (this.batch.length > 0) {
this.push(this.batch);
}
callback();
}
}
// 패턴 3: 비동기 Transform (DB 조회, API 호출 등)
class AsyncEnricher extends Transform {
constructor() {
super({ objectMode: true, highWaterMark: 8 }); // 동시 처리 제한
}
async _transform(record: any, encoding: string, callback: Function) {
try {
const enriched = await fetchFromAPI(record.id);
this.push({ ...record, ...enriched });
callback();
} catch (err) {
callback(err);
}
}
}
// 조합 사용
await pipeline(
createReadStream('users.csv'),
new LineParser(),
new BatchTransform(100),
new AsyncEnricher(),
createWriteStream('enriched.jsonl')
);
실전: 대용량 CSV → DB 임포트
import { pipeline } from 'node:stream/promises';
import { createReadStream } from 'node:fs';
import { Transform, Writable } from 'node:stream';
// 100만 행 CSV를 Stream으로 DB에 배치 INSERT
async function importCSV(filePath: string, pool: Pool) {
let totalRows = 0;
const csvParser = new LineParser();
const batcher = new BatchTransform(1000); // 1000행씩 배치
const dbWriter = new Writable({
objectMode: true,
highWaterMark: 4, // 동시 배치 4개까지만 버퍼링
async write(batch: string[], encoding, callback) {
try {
const values = batch.map(line => {
const [name, email, age] = line.split(',');
return `('${name}','${email}',${age})`;
}).join(',');
await pool.query(
`INSERT INTO users (name, email, age) VALUES ${values}`
);
totalRows += batch.length;
if (totalRows % 10000 === 0) {
console.log(`${totalRows} rows imported...`);
}
callback();
} catch (err) {
callback(err as Error);
}
}
});
await pipeline(
createReadStream(filePath),
csvParser,
batcher,
dbWriter
);
console.log(`완료: ${totalRows} rows imported`);
// → 메모리 사용량: ~50MB 이하 (100만 행이어도!)
}
실전: HTTP 스트리밍 프록시
import { pipeline } from 'node:stream/promises';
import { createGzip } from 'node:zlib';
// Express/NestJS에서 대용량 API 응답 스트리밍
app.get('/export/users', async (req, res) => {
const cursor = db.collection('users').find().cursor();
res.setHeader('Content-Type', 'application/x-ndjson');
res.setHeader('Content-Encoding', 'gzip');
const jsonSerializer = new Transform({
objectMode: true,
transform(doc, encoding, callback) {
callback(null, JSON.stringify(doc) + 'n');
}
});
try {
await pipeline(
cursor, // MongoDB cursor (Readable)
jsonSerializer, // 객체 → NDJSON 변환
createGzip(), // Gzip 압축
res // HTTP Response (Writable)
);
} catch (err) {
// 클라이언트 연결 끊김 등 에러 처리
if (!res.headersSent) {
res.status(500).json({ error: 'Export failed' });
}
}
});
// Async Iterator로 Readable 생성 (Node.js 16+)
import { Readable } from 'node:stream';
async function* generateData() {
for (let i = 0; i < 1000000; i++) {
yield JSON.stringify({ id: i, data: `row-${i}` }) + 'n';
// 매 1000개마다 이벤트 루프 양보
if (i % 1000 === 0) {
await new Promise(resolve => setImmediate(resolve));
}
}
}
const readable = Readable.from(generateData());
for await…of 패턴
// Readable Stream을 async iterator로 소비
import { createReadStream } from 'node:fs';
import { createInterface } from 'node:readline';
// 줄 단위 처리 (readline + async iterator)
const rl = createInterface({
input: createReadStream('large-log.txt'),
crlfDelay: Infinity,
});
let errorCount = 0;
for await (const line of rl) {
if (line.includes('ERROR')) {
errorCount++;
// 백프레셔가 자동으로 적용됨
// for await가 다음 청크를 요청하기 전까지 읽기 일시 중지
}
}
console.log(`Error lines: ${errorCount}`);
// ⚠️ 주의: for await에서 break하면 스트림이 destroy됨
for await (const chunk of readable) {
if (found) break; // → readable.destroy() 자동 호출
}
메모리 비교: Buffer vs Stream
| 방식 | 1GB 파일 처리 메모리 | 첫 바이트 응답 시간 | 에러 시 정리 |
|---|---|---|---|
| fs.readFile (Buffer) | ~1GB+ | 파일 전체 읽은 후 | GC 의존 |
| Stream + pipe() | ~16KB (highWaterMark) | 첫 청크 즉시 | 수동 정리 필요 |
| Stream + pipeline() | ~16KB (highWaterMark) | 첫 청크 즉시 | 자동 destroy |
NestJS 성능 최적화 실전 가이드에서 Stream을 활용한 API 성능 개선 사례도 확인하세요.
정리
Node.js Streams는 대용량 데이터 처리의 핵심입니다. 백프레셔를 통해 생산자-소비자 간 속도 차이를 자동 조절하고, pipeline()으로 에러 시 모든 스트림을 안전하게 정리합니다. Transform Stream으로 줄 파싱, 배치 처리, 비동기 변환을 구현하고, async iterator로 직관적인 소비 패턴을 사용할 수 있습니다. 파일, HTTP, DB 커서 등 I/O 바운드 작업에서 Stream을 활용하면 메모리 효율이 극적으로 향상됩니다.