Streams

Streams — Processar Dados sem Explodir Memória

Streams são a abstração fundamental do Node.js para processar dados incrementalmente, em pedaços (chunks), sem precisar carregar o conteúdo completo na memória. Um arquivo de 10GB pode ser processado com ~64KB de memória constante. Isso não é otimização prematura — é a diferença entre uma aplicação que funciona e uma que morre com FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - JavaScript heap out of memory.

const fs = require('node:fs');

// Buffering: carrega TUDO na memória antes de processar
// Arquivo de 2GB = 2GB de heap. Em produção com múltiplas requests simultâneas, crash garantido.
const data = fs.readFileSync('dataset-2gb.csv'); // Aloca 2GB de uma vez
processData(data); // Só começa depois que TUDO foi lido

// Streaming: processa chunk por chunk (~64KB cada)
// Arquivo de 2GB = ~64KB de heap constante. Pode processar terabytes.
const stream = fs.createReadStream('dataset-2gb.csv');
stream.on('data', (chunk) => {
  processChunk(chunk); // Começa a processar imediatamente
});

Os Quatro Tipos de Stream

Toda stream no Node.js herda de EventEmitter e implementa uma das quatro interfaces:

TipoDescriçãoExemploEventos principais
ReadableFonte de dadosfs.createReadStream, http.IncomingMessagedata, end, error, readable
WritableDestino de dadosfs.createWriteStream, http.ServerResponsedrain, finish, error, close
TransformModifica dados em trânsitozlib.createGzip, crypto.createCipherCombina Readable + Writable
DuplexLê e escreve independentementenet.Socket, tls.TLSSocketCombina Readable + Writable

A diferença entre Transform e Duplex é sutil mas importante: num Transform, a saída é derivada da entrada (compressão, criptografia). Num Duplex, leitura e escrita são independentes (um socket TCP lê dados do servidor e escreve dados para o servidor — são dois fluxos separados).

Modos de Consumo: Flowing vs Paused

Readable streams operam em dois modos:

const readable = fs.createReadStream('file.txt');

// MODO FLOWING: dados são emitidos automaticamente via evento 'data'
// Ativado por: .on('data'), .pipe(), .resume()
readable.on('data', (chunk) => {
  console.log(`Recebido ${chunk.length} bytes`);
});

// MODO PAUSED: dados são lidos manualmente via .read()
// É o modo padrão. Útil para controle fino de fluxo.
readable.on('readable', () => {
  let chunk;
  while ((chunk = readable.read()) !== null) {  // .read() retorna null quando não há mais dados no buffer
    console.log(`Lido manualmente: ${chunk.length} bytes`);
  }
});

Backpressure: O Mecanismo Mais Importante

Backpressure é o que impede sua aplicação de explodir quando o produtor (readable) é mais rápido que o consumidor (writable). Sem backpressure, os dados se acumulam na memória até o processo ser morto pelo OOM killer.

O mecanismo funciona assim:

  1. writable.write(chunk) retorna false quando o buffer interno atinge o highWaterMark
  2. O readable deve parar de enviar dados até o evento drain ser emitido
  3. drain indica que o buffer do writable foi esvaziado e pode receber mais dados
const readable = fs.createReadStream('huge-file.bin');
const writable = fs.createWriteStream('output.bin');

// Implementação manual de backpressure (para entender o mecanismo)
readable.on('data', (chunk) => {
  const canContinue = writable.write(chunk);

  if (!canContinue) {
    // Buffer do writable está cheio! Pausar leitura.
    console.log('Backpressure ativada — pausando leitura');
    readable.pause();

    writable.once('drain', () => {
      // Buffer esvaziou, pode continuar lendo
      console.log('Drain — retomando leitura');
      readable.resume();
    });
  }
});

readable.on('end', () => {
  writable.end(); // Sinaliza fim da escrita
});

highWaterMark: O Limiar do Buffer

highWaterMark define o tamanho máximo do buffer interno (em bytes, ou em número de objetos no object mode). O padrão é 16KB para streams genéricos e 64KB para fs streams.

// Buffer de 1MB — útil para escritas em disco (menos syscalls, mais throughput)
const writable = fs.createWriteStream('output.bin', { highWaterMark: 1024 * 1024 });

// Buffer de 1KB — útil para responses HTTP (menor latência de primeiro byte)
const smallBuffer = fs.createReadStream('file.txt', { highWaterMark: 1024 });

// O highWaterMark é uma SUGESTÃO, não um limite rígido.
// O buffer pode ultrapassar esse valor temporariamente.

Pipeline vs Pipe: Sempre Use Pipeline

.pipe() foi a API original para conectar streams, mas tem um problema grave: não propaga erros corretamente e não faz cleanup de streams intermediários.

const { pipeline } = require('node:stream');
const { promisify } = require('node:util');
const pipelineAsync = promisify(pipeline);
// Ou a partir do Node 15+:
const { pipeline: pipelineAsync } = require('node:stream/promises');

const fs = require('node:fs');
const zlib = require('node:zlib');
const crypto = require('node:crypto');

// .pipe() — NÃO USE EM PRODUÇÃO
// Se o gzip falhar, o readStream e writeStream ficam abertos (leak de file descriptors)
fs.createReadStream('data.json')
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('data.json.gz'));
// Sem tratamento de erro. Se uma stream falhar, as outras não são notificadas.

// pipeline() — SEMPRE USE ISTO
// Propaga erros, fecha todas as streams no erro, cleanup automático
try {
  await pipelineAsync(
    fs.createReadStream('data.json'),
    zlib.createGzip({ level: 9 }),                          // Compressão máxima
    crypto.createCipheriv('aes-256-cbc', key, iv),          // Criptografia
    fs.createWriteStream('data.json.gz.enc'),
  );
  console.log('Pipeline concluída com sucesso');
} catch (err) {
  console.error('Pipeline falhou:', err.message);
  // Todas as streams já foram fechadas automaticamente
}

Criando Streams Customizados

Para implementar sua própria lógica de streaming, você estende as classes base e implementa os métodos internos (_read, _write, _transform).

Custom Readable: Gerador de Dados

const { Readable } = require('node:stream');

// Stream que lê linhas de um banco de dados em batches (cursor pagination)
class DatabaseCursorStream extends Readable {
  constructor(db, query, options = {}) {
    super({ objectMode: true, highWaterMark: 100 }); // 100 objetos no buffer
    this.db = db;
    this.query = query;
    this.batchSize = options.batchSize || 500;
    this.offset = 0;
    this.finished = false;
  }

  async _read() {
    if (this.finished) {
      this.push(null); // Sinaliza fim do stream
      return;
    }

    try {
      const rows = await this.db.query(
        `${this.query} LIMIT ${this.batchSize} OFFSET ${this.offset}`
      );

      if (rows.length === 0) {
        this.finished = true;
        this.push(null);
        return;
      }

      for (const row of rows) {
        // push() retorna false se o buffer estiver cheio
        // Nesse caso, _read() será chamado novamente quando houver espaço
        if (!this.push(row)) break;
      }

      this.offset += rows.length;
    } catch (err) {
      this.destroy(err); // Propaga erro e faz cleanup
    }
  }

  _destroy(err, callback) {
    // Cleanup: fechar cursor, liberar conexão
    this.db.release().then(() => callback(err)).catch(callback);
  }
}

// Uso: exportar 10 milhões de linhas sem carregar tudo na memória
const stream = new DatabaseCursorStream(pool, 'SELECT * FROM events ORDER BY id');

Custom Transform: Processamento em Pipeline

const { Transform } = require('node:stream');

// Transform que converte CSV em JSON, linha por linha
class CsvToJsonTransform extends Transform {
  constructor(options = {}) {
    super({ readableObjectMode: true, writableObjectMode: false });
    this.headers = null;
    this.remainder = ''; // Buffer para linhas parciais
    this.delimiter = options.delimiter || ',';
    this.lineCount = 0;
  }

  _transform(chunk, encoding, callback) {
    try {
      const text = this.remainder + chunk.toString('utf8');
      const lines = text.split('\n');

      // A última "linha" pode estar incompleta — guardar para o próximo chunk
      this.remainder = lines.pop();

      for (const line of lines) {
        if (line.trim() === '') continue;

        const values = this._parseLine(line);

        if (!this.headers) {
          this.headers = values;
          continue;
        }

        const obj = {};
        for (let i = 0; i < this.headers.length; i++) {
          obj[this.headers[i]] = values[i] ?? null;
        }

        this.lineCount++;
        this.push(obj); // Emite um objeto JSON por linha do CSV
      }

      callback(); // Sinaliza que terminou de processar este chunk
    } catch (err) {
      callback(err); // Propaga erro
    }
  }

  _flush(callback) {
    // Chamado quando o readable source termina — processa o que sobrou
    if (this.remainder.trim() && this.headers) {
      const values = this._parseLine(this.remainder);
      const obj = {};
      for (let i = 0; i < this.headers.length; i++) {
        obj[this.headers[i]] = values[i] ?? null;
      }
      this.push(obj);
      this.lineCount++;
    }
    callback();
  }

  _parseLine(line) {
    // Parser simplificado — em produção use uma lib como csv-parse
    return line.split(this.delimiter).map(v => v.trim().replace(/^"|"$/g, ''));
  }
}

Custom Writable: Batch Insert

const { Writable } = require('node:stream');

// Writable que acumula objetos e faz batch insert no PostgreSQL
class BatchInsertStream extends Writable {
  constructor(db, table, options = {}) {
    super({ objectMode: true, highWaterMark: options.batchSize || 1000 });
    this.db = db;
    this.table = table;
    this.batchSize = options.batchSize || 1000;
    this.buffer = [];
    this.totalInserted = 0;
  }

  async _write(obj, encoding, callback) {
    this.buffer.push(obj);

    if (this.buffer.length >= this.batchSize) {
      try {
        await this._flush_buffer();
        callback();
      } catch (err) {
        callback(err);
      }
    } else {
      callback(); // Ainda não atingiu o batch — aceita mais dados
    }
  }

  async _final(callback) {
    // Chamado quando o stream é finalizado — insere o que sobrou
    try {
      if (this.buffer.length > 0) {
        await this._flush_buffer();
      }
      console.log(`Total inserido: ${this.totalInserted} linhas`);
      callback();
    } catch (err) {
      callback(err);
    }
  }

  async _flush_buffer() {
    const batch = this.buffer.splice(0, this.batchSize);
    const columns = Object.keys(batch[0]);
    const values = batch.map((row, i) =>
      `(${columns.map((_, j) => `$${i * columns.length + j + 1}`).join(',')})`
    ).join(',');

    const params = batch.flatMap(row => columns.map(col => row[col]));
    await this.db.query(
      `INSERT INTO ${this.table} (${columns.join(',')}) VALUES ${values}`,
      params
    );
    this.totalInserted += batch.length;
  }
}

Object Mode: Streams de Objetos

Por padrão, streams operam com Buffer (dados binários) ou string. No object mode, cada chunk é um objeto JavaScript arbitrário. O highWaterMark passa a contar número de objetos em vez de bytes.

const { Transform } = require('node:stream');

// Pipeline completa: CSV -> JSON objects -> filtragem -> batch insert
const { pipeline } = require('node:stream/promises');

await pipeline(
  fs.createReadStream('users-10M.csv'),           // Readable (bytes)
  new CsvToJsonTransform(),                         // Transform (bytes -> objects)
  new Transform({                                   // Transform (filter)
    objectMode: true,
    transform(user, enc, cb) {
      // Filtra apenas usuários ativos com email válido
      if (user.status === 'active' && user.email?.includes('@')) {
        this.push(user);
      }
      cb();
    },
  }),
  new Transform({                                   // Transform (enrich)
    objectMode: true,
    transform(user, enc, cb) {
      user.imported_at = new Date().toISOString();
      user.email = user.email.toLowerCase().trim();
      this.push(user);
      cb();
    },
  }),
  new BatchInsertStream(pool, 'users', { batchSize: 5000 }),  // Writable (objects)
);

console.log('10 milhões de usuários importados com ~64KB de memória');

Web Streams API: Compatibilidade com Browsers

A partir do Node.js 18+, a Web Streams API (WHATWG Streams) está disponível globalmente. Essa é a mesma API que os browsers implementam. Útil para código isomórfico e integração com APIs modernas como fetch().

// Web Streams — funciona identicamente no Node.js e no browser
const response = await fetch('https://api.example.com/large-dataset.json');

// response.body é um ReadableStream (Web API)
const reader = response.body.getReader();
const decoder = new TextDecoder();

let totalBytes = 0;
while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  totalBytes += value.byteLength;
  process.stdout.write(`\rBaixado: ${(totalBytes / 1024 / 1024).toFixed(1)} MB`);
}

// Conversão entre Node Streams e Web Streams
const { Readable } = require('node:stream');

// Node Readable -> Web ReadableStream
const nodeStream = fs.createReadStream('file.txt');
const webStream = Readable.toWeb(nodeStream);

// Web ReadableStream -> Node Readable
const backToNode = Readable.fromWeb(webStream);
// TransformStream (Web API) — equivalente ao Transform do Node
const uppercaseTransform = new TransformStream({
  transform(chunk, controller) {
    const text = new TextDecoder().decode(chunk);
    const upper = new TextEncoder().encode(text.toUpperCase());
    controller.enqueue(upper);
  },
});

// Piping com Web Streams
const readable = (await fetch('https://example.com/text.txt')).body;
const writable = new WritableStream({
  write(chunk) {
    console.log('Chunk recebido:', new TextDecoder().decode(chunk));
  },
});

await readable.pipeThrough(uppercaseTransform).pipeTo(writable);

Async Iterators: A Interface Moderna

A partir do Node.js 10+, toda Readable stream implementa o protocolo Symbol.asyncIterator, permitindo o uso de for await...of. Essa é a forma mais limpa e legível de consumir streams.

const fs = require('node:fs');
const readline = require('node:readline');

// Processar arquivo linha por linha com async iterator
const fileStream = fs.createReadStream('access.log');
const rl = readline.createInterface({ input: fileStream, crlfDelay: Infinity });

const statusCounts = { '2xx': 0, '3xx': 0, '4xx': 0, '5xx': 0 };

for await (const line of rl) {
  const match = line.match(/HTTP\/\d\.\d" (\d)\d{2}/);
  if (match) {
    statusCounts[`${match[1]}xx`]++;
  }
}

console.log('Distribuição de status codes:', statusCounts);

Generators como Readable Streams

Async generators podem ser convertidos em Readable streams, criando fontes de dados elegantes:

const { Readable } = require('node:stream');
const { pipeline } = require('node:stream/promises');

// Generator que produz dados paginados de uma API
async function* fetchAllPages(baseUrl) {
  let page = 1;
  let hasMore = true;

  while (hasMore) {
    const response = await fetch(`${baseUrl}?page=${page}&limit=100`);
    const data = await response.json();

    for (const item of data.results) {
      yield item; // Cada item vira um chunk do stream
    }

    hasMore = data.hasNextPage;
    page++;
  }
}

// Usar o generator como stream numa pipeline
await pipeline(
  Readable.from(fetchAllPages('https://api.example.com/products')),
  new Transform({
    objectMode: true,
    transform(product, enc, cb) {
      // Normaliza preço para centavos
      product.price_cents = Math.round(product.price * 100);
      delete product.price;
      this.push(JSON.stringify(product) + '\n');
      cb();
    },
  }),
  fs.createWriteStream('products-normalized.jsonl'),
);

Aplicações Práticas

Upload Multipart com Streaming

const http = require('node:http');
const { pipeline } = require('node:stream/promises');
const { createHash } = require('node:crypto');

const server = http.createServer(async (req, res) => {
  if (req.method === 'PUT' && req.url.startsWith('/upload/')) {
    const filename = req.url.split('/').pop();
    const expectedSize = parseInt(req.headers['content-length'], 10);

    // Hash do arquivo em tempo real durante o upload (sem buffer intermediário)
    const hash = createHash('sha256');
    let receivedBytes = 0;

    const progressTransform = new Transform({
      transform(chunk, enc, cb) {
        receivedBytes += chunk.length;
        hash.update(chunk);

        // Log de progresso a cada 10MB
        if (receivedBytes % (10 * 1024 * 1024) < chunk.length) {
          const pct = ((receivedBytes / expectedSize) * 100).toFixed(1);
          console.log(`Upload ${filename}: ${pct}%`);
        }

        cb(null, chunk); // Passa o chunk adiante sem modificar
      },
    });

    try {
      await pipeline(
        req,                                              // Readable: corpo da request HTTP
        progressTransform,                                // Transform: progresso + hash
        fs.createWriteStream(`/uploads/${filename}`),     // Writable: arquivo no disco
      );

      const checksum = hash.digest('hex');
      res.writeHead(201, { 'Content-Type': 'application/json' });
      res.end(JSON.stringify({ filename, size: receivedBytes, sha256: checksum }));
    } catch (err) {
      // Cleanup: remover arquivo parcial
      fs.unlink(`/uploads/${filename}`, () => {});
      res.writeHead(500);
      res.end(JSON.stringify({ error: err.message }));
    }
  }
});

server.listen(3000);

ETL Pipeline: CSV para PostgreSQL

const { pipeline } = require('node:stream/promises');
const { Transform } = require('node:stream');
const { createReadStream } = require('node:fs');
const { createGunzip } = require('node:zlib');
const { parse } = require('csv-parse'); // npm install csv-parse

async function etlPipeline(inputFile, db) {
  let processed = 0;
  let errors = 0;
  const startTime = Date.now();

  const validate = new Transform({
    objectMode: true,
    transform(record, enc, cb) {
      // Validação e limpeza
      if (!record.email || !record.email.includes('@')) {
        errors++;
        return cb(); // Descarta registros inválidos (não propaga erro)
      }

      this.push({
        email: record.email.toLowerCase().trim(),
        name: record.name?.trim() || null,
        amount: parseFloat(record.amount) || 0,
        created_at: new Date(record.date).toISOString(),
      });

      processed++;
      if (processed % 100_000 === 0) {
        const elapsed = ((Date.now() - startTime) / 1000).toFixed(1);
        const rate = Math.round(processed / parseFloat(elapsed));
        console.log(`Processados: ${processed.toLocaleString()} | Erros: ${errors} | ${rate} registros/s`);
      }

      cb();
    },
  });

  await pipeline(
    createReadStream(inputFile),         // Lê o arquivo .csv.gz
    createGunzip(),                       // Descomprime gzip on-the-fly
    parse({                               // Parse CSV -> objetos
      columns: true,                      // Usa a primeira linha como headers
      skip_empty_lines: true,
      cast: false,                        // Não fazer cast automático
      relax_column_count: true,
    }),
    validate,                             // Validação e limpeza
    new BatchInsertStream(db, 'transactions', { batchSize: 5000 }),
  );

  const totalTime = ((Date.now() - startTime) / 1000).toFixed(1);
  console.log(`\nETL concluído: ${processed.toLocaleString()} registros em ${totalTime}s | ${errors} erros`);
}

// Uso: processar arquivo de 50GB comprimido com ~100MB de memória
await etlPipeline('transactions-2024.csv.gz', pool);

HTTP Response Streaming (Server-Sent Events)

const http = require('node:http');

const server = http.createServer((req, res) => {
  if (req.url === '/events') {
    // Server-Sent Events — streaming de dados do servidor para o cliente
    res.writeHead(200, {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
    });

    // Envia heartbeat a cada 15s para manter a conexão viva
    const heartbeat = setInterval(() => {
      res.write(':heartbeat\n\n'); // Comentário SSE — ignorado pelo cliente
    }, 15000);

    // Simula stream de dados em tempo real
    let eventId = 0;
    const dataInterval = setInterval(() => {
      const event = {
        id: ++eventId,
        timestamp: Date.now(),
        cpu: (Math.random() * 100).toFixed(1),
        memory: process.memoryUsage().heapUsed,
      };

      res.write(`id: ${event.id}\n`);
      res.write(`event: metrics\n`);
      res.write(`data: ${JSON.stringify(event)}\n\n`);
    }, 1000);

    // Cleanup quando o cliente desconecta
    req.on('close', () => {
      clearInterval(heartbeat);
      clearInterval(dataInterval);
      console.log('Cliente desconectou do SSE');
    });
  }
});

server.listen(3000);

Performance: Streaming vs Buffering

A diferença de consumo de memória entre buffering e streaming é dramática e cresce linearmente com o tamanho do input:

const { performance } = require('node:perf_hooks');

// Benchmark: processar arquivo de N tamanho
async function benchmarkBuffered(filepath) {
  const start = performance.now();
  const memBefore = process.memoryUsage().heapUsed;

  const data = require('node:fs').readFileSync(filepath);
  // Simula processamento: contar linhas
  const lines = data.toString().split('\n').length;

  const memAfter = process.memoryUsage().heapUsed;
  const duration = performance.now() - start;

  return {
    mode: 'buffered',
    lines,
    duration: `${duration.toFixed(0)}ms`,
    memoryDelta: `${((memAfter - memBefore) / 1024 / 1024).toFixed(1)}MB`,
  };
}

async function benchmarkStreamed(filepath) {
  const start = performance.now();
  const memBefore = process.memoryUsage().heapUsed;
  let lines = 0;

  const rl = require('node:readline').createInterface({
    input: require('node:fs').createReadStream(filepath),
    crlfDelay: Infinity,
  });

  for await (const _ of rl) {
    lines++;
  }

  const memAfter = process.memoryUsage().heapUsed;
  const duration = performance.now() - start;

  return {
    mode: 'streamed',
    lines,
    duration: `${duration.toFixed(0)}ms`,
    memoryDelta: `${((memAfter - memBefore) / 1024 / 1024).toFixed(1)}MB`,
  };
}

// Resultados típicos para arquivo de 1GB:
// buffered: { duration: '3200ms', memoryDelta: '1024.0MB' }
// streamed: { duration: '2800ms', memoryDelta: '2.1MB' }
//
// Para 5GB (com --max-old-space-size=8192):
// buffered: { duration: '18000ms', memoryDelta: '5120.0MB' }
// streamed: { duration: '14000ms', memoryDelta: '2.3MB' }  <-- memória CONSTANTE

Resumo de Decisão

Pergunta: "Devo usar streams ou carregar tudo na memória?"

1. O tamanho do input é previsível e pequeno (< 50MB)?
   → Buffering é aceitável. Mais simples de debugar.

2. O input pode ser arbitrariamente grande? (upload, arquivo, query de DB)
   → SEMPRE use streams. Sem exceção.

3. Preciso de transformações encadeadas? (parse -> validate -> enrich -> save)
   → pipeline() com Transform streams

4. Preciso de compatibilidade Node.js + browser?
   → Web Streams API (ReadableStream, TransformStream)

5. Quero código legível sem callbacks?
   → for await...of com async iterators

6. .pipe() ou pipeline()?
   → SEMPRE pipeline(). Sem exceção.
   → .pipe() não faz cleanup em caso de erro = leak de memória/file descriptors

Referencias e Fontes

  • Node.js Streams Documentationhttps://nodejs.org/api/stream.html — Documentacao oficial sobre Readable, Writable, Transform e pipeline
  • “Node.js Design Patterns” — Mario Casciaro, Luciano Mammino — Cobertura detalhada de Streams, backpressure e patterns de composicao