Node.js Streams 백프레셔 심화

Node.js Streams란?

Node.js Streams는 대용량 데이터를 청크(chunk) 단위로 처리하는 추상화입니다. 파일 전체를 메모리에 올리지 않고 조각씩 읽어 처리하므로, 수 GB 파일도 수십 MB 메모리로 처리할 수 있습니다. HTTP 요청/응답, 파일 I/O, 데이터 변환, 압축 등 Node.js의 핵심 API가 모두 Stream 기반입니다.

이 글에서는 4가지 Stream 유형, 백프레셔(Backpressure) 메커니즘, pipeline API, Transform Stream 패턴, 그리고 실전 대용량 처리 예제까지 심층적으로 다룹니다. NestJS StreamableFile 스트리밍 가이드와 함께 읽으면 NestJS에서의 실전 활용까지 이해할 수 있습니다.

4가지 Stream 유형

유형 역할 대표 예시 핵심 메서드
Readable 데이터 소스 (읽기) fs.createReadStream, http.IncomingMessage read(), push(), _read()
Writable 데이터 목적지 (쓰기) fs.createWriteStream, http.ServerResponse write(), end(), _write()
Transform 데이터 변환 (읽기+쓰기) zlib.createGzip, crypto.createCipher _transform(), _flush()
Duplex 독립적 읽기+쓰기 net.Socket, tls.TLSSocket _read(), _write()

백프레셔(Backpressure) 원리

백프레셔는 Stream의 가장 중요한 개념입니다. 생산 속도 > 소비 속도일 때, 생산자를 자동으로 일시 중지하여 메모리 폭발을 방지합니다.

// ❌ 안티패턴: 백프레셔 무시 → 메모리 폭발
const readable = fs.createReadStream('huge-file.csv');  // 읽기: 빠름
const writable = fs.createWriteStream('output.csv');    // 쓰기: 느림 (디스크)

readable.on('data', (chunk) => {
  // write()의 반환값을 무시하면 내부 버퍼가 무한 증가!
  writable.write(processChunk(chunk));
  // → 10GB 파일 처리 시 메모리 수 GB 사용
});

// ✅ 올바른 패턴: write() 반환값으로 백프레셔 처리
readable.on('data', (chunk) => {
  const canContinue = writable.write(processChunk(chunk));
  
  if (!canContinue) {
    // 내부 버퍼가 highWaterMark에 도달 → 읽기 일시 중지
    readable.pause();
    
    // writable 버퍼가 비워지면 다시 읽기 시작
    writable.once('drain', () => {
      readable.resume();
    });
  }
});

// ✅ 가장 좋은 패턴: pipe() 또는 pipeline() 사용
// 백프레셔를 자동으로 처리함
readable.pipe(writable);

highWaterMark 이해

// highWaterMark = 내부 버퍼 임계값 (바이트)
// 기본값: Readable 16KB, Writable 16KB

const readable = fs.createReadStream('file.dat', {
  highWaterMark: 64 * 1024,  // 64KB로 증가
  // 큰 값: 적은 I/O 호출, 더 많은 메모리
  // 작은 값: 많은 I/O 호출, 적은 메모리
});

// objectMode에서는 바이트가 아니라 객체 수
const transform = new Transform({
  objectMode: true,
  highWaterMark: 16,  // 16개 객체까지 버퍼링
});

// 버퍼 상태 확인
console.log(readable.readableLength);     // 현재 버퍼 크기
console.log(readable.readableHighWaterMark); // 임계값
console.log(writable.writableLength);     // 현재 버퍼 크기
console.log(writable.writableHighWaterMark); // 임계값

pipeline() API (권장)

stream.pipeline()은 Node.js 10+에서 도입된 안전한 Stream 연결 API입니다. pipe()의 에러 처리 문제를 해결합니다.

import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
import { Transform } from 'node:stream';

// pipe()의 문제점
readable
  .pipe(transform)  // transform에서 에러 → readable이 정리 안됨!
  .pipe(writable);  // 에러 전파 안됨, 메모리 누수

// ✅ pipeline(): 에러 시 모든 스트림 자동 정리 (destroy)
await pipeline(
  createReadStream('input.csv'),
  new Transform({
    transform(chunk, encoding, callback) {
      // CSV → JSON 변환
      const lines = chunk.toString().split('n');
      const json = lines.map(line => {
        const [name, age, email] = line.split(',');
        return JSON.stringify({ name, age: Number(age), email });
      }).join('n');
      callback(null, json);
    }
  }),
  createGzip(),
  createWriteStream('output.json.gz')
);
console.log('파이프라인 완료');

// AbortController로 파이프라인 취소
const ac = new AbortController();
setTimeout(() => ac.abort(), 5000);  // 5초 후 취소

try {
  await pipeline(readable, transform, writable, { signal: ac.signal });
} catch (err) {
  if (err.code === 'ABORT_ERR') {
    console.log('파이프라인 취소됨');
  }
}

Custom Transform Stream 패턴

import { Transform } from 'node:stream';

// 패턴 1: 줄 단위 파싱 (바이트 스트림 → 줄 스트림)
class LineParser extends Transform {
  private buffer = '';

  constructor() {
    super({ objectMode: true }); // 출력: 객체(문자열) 단위
  }

  _transform(chunk: Buffer, encoding: string, callback: Function) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('n');
    
    // 마지막 요소는 불완전한 줄일 수 있으므로 버퍼에 유지
    this.buffer = lines.pop() || '';
    
    for (const line of lines) {
      if (line.trim()) {
        this.push(line);  // 완성된 줄을 downstream으로 전달
      }
    }
    callback();
  }

  _flush(callback: Function) {
    // 스트림 종료 시 남은 버퍼 처리
    if (this.buffer.trim()) {
      this.push(this.buffer);
    }
    callback();
  }
}

// 패턴 2: 배치 처리 (N개씩 묶어서 전달)
class BatchTransform extends Transform {
  private batch: any[] = [];
  
  constructor(private batchSize: number) {
    super({ objectMode: true });
  }

  _transform(item: any, encoding: string, callback: Function) {
    this.batch.push(item);
    
    if (this.batch.length >= this.batchSize) {
      this.push(this.batch);
      this.batch = [];
    }
    callback();
  }

  _flush(callback: Function) {
    if (this.batch.length > 0) {
      this.push(this.batch);
    }
    callback();
  }
}

// 패턴 3: 비동기 Transform (DB 조회, API 호출 등)
class AsyncEnricher extends Transform {
  constructor() {
    super({ objectMode: true, highWaterMark: 8 }); // 동시 처리 제한
  }

  async _transform(record: any, encoding: string, callback: Function) {
    try {
      const enriched = await fetchFromAPI(record.id);
      this.push({ ...record, ...enriched });
      callback();
    } catch (err) {
      callback(err);
    }
  }
}

// 조합 사용
await pipeline(
  createReadStream('users.csv'),
  new LineParser(),
  new BatchTransform(100),
  new AsyncEnricher(),
  createWriteStream('enriched.jsonl')
);

실전: 대용량 CSV → DB 임포트

import { pipeline } from 'node:stream/promises';
import { createReadStream } from 'node:fs';
import { Transform, Writable } from 'node:stream';

// 100만 행 CSV를 Stream으로 DB에 배치 INSERT
async function importCSV(filePath: string, pool: Pool) {
  let totalRows = 0;
  
  const csvParser = new LineParser();
  
  const batcher = new BatchTransform(1000);  // 1000행씩 배치
  
  const dbWriter = new Writable({
    objectMode: true,
    highWaterMark: 4,  // 동시 배치 4개까지만 버퍼링
    
    async write(batch: string[], encoding, callback) {
      try {
        const values = batch.map(line => {
          const [name, email, age] = line.split(',');
          return `('${name}','${email}',${age})`;
        }).join(',');
        
        await pool.query(
          `INSERT INTO users (name, email, age) VALUES ${values}`
        );
        
        totalRows += batch.length;
        if (totalRows % 10000 === 0) {
          console.log(`${totalRows} rows imported...`);
        }
        callback();
      } catch (err) {
        callback(err as Error);
      }
    }
  });

  await pipeline(
    createReadStream(filePath),
    csvParser,
    batcher,
    dbWriter
  );
  
  console.log(`완료: ${totalRows} rows imported`);
  // → 메모리 사용량: ~50MB 이하 (100만 행이어도!)
}

실전: HTTP 스트리밍 프록시

import { pipeline } from 'node:stream/promises';
import { createGzip } from 'node:zlib';

// Express/NestJS에서 대용량 API 응답 스트리밍
app.get('/export/users', async (req, res) => {
  const cursor = db.collection('users').find().cursor();
  
  res.setHeader('Content-Type', 'application/x-ndjson');
  res.setHeader('Content-Encoding', 'gzip');
  
  const jsonSerializer = new Transform({
    objectMode: true,
    transform(doc, encoding, callback) {
      callback(null, JSON.stringify(doc) + 'n');
    }
  });

  try {
    await pipeline(
      cursor,          // MongoDB cursor (Readable)
      jsonSerializer,  // 객체 → NDJSON 변환
      createGzip(),    // Gzip 압축
      res              // HTTP Response (Writable)
    );
  } catch (err) {
    // 클라이언트 연결 끊김 등 에러 처리
    if (!res.headersSent) {
      res.status(500).json({ error: 'Export failed' });
    }
  }
});

// Async Iterator로 Readable 생성 (Node.js 16+)
import { Readable } from 'node:stream';

async function* generateData() {
  for (let i = 0; i < 1000000; i++) {
    yield JSON.stringify({ id: i, data: `row-${i}` }) + 'n';
    
    // 매 1000개마다 이벤트 루프 양보
    if (i % 1000 === 0) {
      await new Promise(resolve => setImmediate(resolve));
    }
  }
}

const readable = Readable.from(generateData());

for await…of 패턴

// Readable Stream을 async iterator로 소비
import { createReadStream } from 'node:fs';
import { createInterface } from 'node:readline';

// 줄 단위 처리 (readline + async iterator)
const rl = createInterface({
  input: createReadStream('large-log.txt'),
  crlfDelay: Infinity,
});

let errorCount = 0;
for await (const line of rl) {
  if (line.includes('ERROR')) {
    errorCount++;
    // 백프레셔가 자동으로 적용됨
    // for await가 다음 청크를 요청하기 전까지 읽기 일시 중지
  }
}
console.log(`Error lines: ${errorCount}`);

// ⚠️ 주의: for await에서 break하면 스트림이 destroy됨
for await (const chunk of readable) {
  if (found) break;  // → readable.destroy() 자동 호출
}

메모리 비교: Buffer vs Stream

방식 1GB 파일 처리 메모리 첫 바이트 응답 시간 에러 시 정리
fs.readFile (Buffer) ~1GB+ 파일 전체 읽은 후 GC 의존
Stream + pipe() ~16KB (highWaterMark) 첫 청크 즉시 수동 정리 필요
Stream + pipeline() ~16KB (highWaterMark) 첫 청크 즉시 자동 destroy

NestJS 성능 최적화 실전 가이드에서 Stream을 활용한 API 성능 개선 사례도 확인하세요.

정리

Node.js Streams는 대용량 데이터 처리의 핵심입니다. 백프레셔를 통해 생산자-소비자 간 속도 차이를 자동 조절하고, pipeline()으로 에러 시 모든 스트림을 안전하게 정리합니다. Transform Stream으로 줄 파싱, 배치 처리, 비동기 변환을 구현하고, async iterator로 직관적인 소비 패턴을 사용할 수 있습니다. 파일, HTTP, DB 커서 등 I/O 바운드 작업에서 Stream을 활용하면 메모리 효율이 극적으로 향상됩니다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux