Streams
Streams — Processar Dados sem Explodir Memória
Streams são a abstração fundamental do Node.js para processar dados incrementalmente, em pedaços (chunks), sem precisar carregar o conteúdo completo na memória. Um arquivo de 10GB pode ser processado com ~64KB de memória constante. Isso não é otimização prematura — é a diferença entre uma aplicação que funciona e uma que morre com FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - JavaScript heap out of memory.
const fs = require('node:fs');
// Buffering: carrega TUDO na memória antes de processar
// Arquivo de 2GB = 2GB de heap. Em produção com múltiplas requests simultâneas, crash garantido.
const data = fs.readFileSync('dataset-2gb.csv'); // Aloca 2GB de uma vez
processData(data); // Só começa depois que TUDO foi lido
// Streaming: processa chunk por chunk (~64KB cada)
// Arquivo de 2GB = ~64KB de heap constante. Pode processar terabytes.
const stream = fs.createReadStream('dataset-2gb.csv');
stream.on('data', (chunk) => {
processChunk(chunk); // Começa a processar imediatamente
});
Os Quatro Tipos de Stream
Toda stream no Node.js herda de EventEmitter e implementa uma das quatro interfaces:
| Tipo | Descrição | Exemplo | Eventos principais |
|---|---|---|---|
| Readable | Fonte de dados | fs.createReadStream, http.IncomingMessage | data, end, error, readable |
| Writable | Destino de dados | fs.createWriteStream, http.ServerResponse | drain, finish, error, close |
| Transform | Modifica dados em trânsito | zlib.createGzip, crypto.createCipher | Combina Readable + Writable |
| Duplex | Lê e escreve independentemente | net.Socket, tls.TLSSocket | Combina Readable + Writable |
A diferença entre Transform e Duplex é sutil mas importante: num Transform, a saída é derivada da entrada (compressão, criptografia). Num Duplex, leitura e escrita são independentes (um socket TCP lê dados do servidor e escreve dados para o servidor — são dois fluxos separados).
Modos de Consumo: Flowing vs Paused
Readable streams operam em dois modos:
const readable = fs.createReadStream('file.txt');
// MODO FLOWING: dados são emitidos automaticamente via evento 'data'
// Ativado por: .on('data'), .pipe(), .resume()
readable.on('data', (chunk) => {
console.log(`Recebido ${chunk.length} bytes`);
});
// MODO PAUSED: dados são lidos manualmente via .read()
// É o modo padrão. Útil para controle fino de fluxo.
readable.on('readable', () => {
let chunk;
while ((chunk = readable.read()) !== null) { // .read() retorna null quando não há mais dados no buffer
console.log(`Lido manualmente: ${chunk.length} bytes`);
}
});
Backpressure: O Mecanismo Mais Importante
Backpressure é o que impede sua aplicação de explodir quando o produtor (readable) é mais rápido que o consumidor (writable). Sem backpressure, os dados se acumulam na memória até o processo ser morto pelo OOM killer.
O mecanismo funciona assim:
writable.write(chunk)retornafalsequando o buffer interno atinge ohighWaterMark- O readable deve parar de enviar dados até o evento
drainser emitido drainindica que o buffer do writable foi esvaziado e pode receber mais dados
const readable = fs.createReadStream('huge-file.bin');
const writable = fs.createWriteStream('output.bin');
// Implementação manual de backpressure (para entender o mecanismo)
readable.on('data', (chunk) => {
const canContinue = writable.write(chunk);
if (!canContinue) {
// Buffer do writable está cheio! Pausar leitura.
console.log('Backpressure ativada — pausando leitura');
readable.pause();
writable.once('drain', () => {
// Buffer esvaziou, pode continuar lendo
console.log('Drain — retomando leitura');
readable.resume();
});
}
});
readable.on('end', () => {
writable.end(); // Sinaliza fim da escrita
});
highWaterMark: O Limiar do Buffer
highWaterMark define o tamanho máximo do buffer interno (em bytes, ou em número de objetos no object mode). O padrão é 16KB para streams genéricos e 64KB para fs streams.
// Buffer de 1MB — útil para escritas em disco (menos syscalls, mais throughput)
const writable = fs.createWriteStream('output.bin', { highWaterMark: 1024 * 1024 });
// Buffer de 1KB — útil para responses HTTP (menor latência de primeiro byte)
const smallBuffer = fs.createReadStream('file.txt', { highWaterMark: 1024 });
// O highWaterMark é uma SUGESTÃO, não um limite rígido.
// O buffer pode ultrapassar esse valor temporariamente.
Pipeline vs Pipe: Sempre Use Pipeline
.pipe() foi a API original para conectar streams, mas tem um problema grave: não propaga erros corretamente e não faz cleanup de streams intermediários.
const { pipeline } = require('node:stream');
const { promisify } = require('node:util');
const pipelineAsync = promisify(pipeline);
// Ou a partir do Node 15+:
const { pipeline: pipelineAsync } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');
const crypto = require('node:crypto');
// .pipe() — NÃO USE EM PRODUÇÃO
// Se o gzip falhar, o readStream e writeStream ficam abertos (leak de file descriptors)
fs.createReadStream('data.json')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('data.json.gz'));
// Sem tratamento de erro. Se uma stream falhar, as outras não são notificadas.
// pipeline() — SEMPRE USE ISTO
// Propaga erros, fecha todas as streams no erro, cleanup automático
try {
await pipelineAsync(
fs.createReadStream('data.json'),
zlib.createGzip({ level: 9 }), // Compressão máxima
crypto.createCipheriv('aes-256-cbc', key, iv), // Criptografia
fs.createWriteStream('data.json.gz.enc'),
);
console.log('Pipeline concluída com sucesso');
} catch (err) {
console.error('Pipeline falhou:', err.message);
// Todas as streams já foram fechadas automaticamente
}
Criando Streams Customizados
Para implementar sua própria lógica de streaming, você estende as classes base e implementa os métodos internos (_read, _write, _transform).
Custom Readable: Gerador de Dados
const { Readable } = require('node:stream');
// Stream que lê linhas de um banco de dados em batches (cursor pagination)
class DatabaseCursorStream extends Readable {
constructor(db, query, options = {}) {
super({ objectMode: true, highWaterMark: 100 }); // 100 objetos no buffer
this.db = db;
this.query = query;
this.batchSize = options.batchSize || 500;
this.offset = 0;
this.finished = false;
}
async _read() {
if (this.finished) {
this.push(null); // Sinaliza fim do stream
return;
}
try {
const rows = await this.db.query(
`${this.query} LIMIT ${this.batchSize} OFFSET ${this.offset}`
);
if (rows.length === 0) {
this.finished = true;
this.push(null);
return;
}
for (const row of rows) {
// push() retorna false se o buffer estiver cheio
// Nesse caso, _read() será chamado novamente quando houver espaço
if (!this.push(row)) break;
}
this.offset += rows.length;
} catch (err) {
this.destroy(err); // Propaga erro e faz cleanup
}
}
_destroy(err, callback) {
// Cleanup: fechar cursor, liberar conexão
this.db.release().then(() => callback(err)).catch(callback);
}
}
// Uso: exportar 10 milhões de linhas sem carregar tudo na memória
const stream = new DatabaseCursorStream(pool, 'SELECT * FROM events ORDER BY id');
Custom Transform: Processamento em Pipeline
const { Transform } = require('node:stream');
// Transform que converte CSV em JSON, linha por linha
class CsvToJsonTransform extends Transform {
constructor(options = {}) {
super({ readableObjectMode: true, writableObjectMode: false });
this.headers = null;
this.remainder = ''; // Buffer para linhas parciais
this.delimiter = options.delimiter || ',';
this.lineCount = 0;
}
_transform(chunk, encoding, callback) {
try {
const text = this.remainder + chunk.toString('utf8');
const lines = text.split('\n');
// A última "linha" pode estar incompleta — guardar para o próximo chunk
this.remainder = lines.pop();
for (const line of lines) {
if (line.trim() === '') continue;
const values = this._parseLine(line);
if (!this.headers) {
this.headers = values;
continue;
}
const obj = {};
for (let i = 0; i < this.headers.length; i++) {
obj[this.headers[i]] = values[i] ?? null;
}
this.lineCount++;
this.push(obj); // Emite um objeto JSON por linha do CSV
}
callback(); // Sinaliza que terminou de processar este chunk
} catch (err) {
callback(err); // Propaga erro
}
}
_flush(callback) {
// Chamado quando o readable source termina — processa o que sobrou
if (this.remainder.trim() && this.headers) {
const values = this._parseLine(this.remainder);
const obj = {};
for (let i = 0; i < this.headers.length; i++) {
obj[this.headers[i]] = values[i] ?? null;
}
this.push(obj);
this.lineCount++;
}
callback();
}
_parseLine(line) {
// Parser simplificado — em produção use uma lib como csv-parse
return line.split(this.delimiter).map(v => v.trim().replace(/^"|"$/g, ''));
}
}
Custom Writable: Batch Insert
const { Writable } = require('node:stream');
// Writable que acumula objetos e faz batch insert no PostgreSQL
class BatchInsertStream extends Writable {
constructor(db, table, options = {}) {
super({ objectMode: true, highWaterMark: options.batchSize || 1000 });
this.db = db;
this.table = table;
this.batchSize = options.batchSize || 1000;
this.buffer = [];
this.totalInserted = 0;
}
async _write(obj, encoding, callback) {
this.buffer.push(obj);
if (this.buffer.length >= this.batchSize) {
try {
await this._flush_buffer();
callback();
} catch (err) {
callback(err);
}
} else {
callback(); // Ainda não atingiu o batch — aceita mais dados
}
}
async _final(callback) {
// Chamado quando o stream é finalizado — insere o que sobrou
try {
if (this.buffer.length > 0) {
await this._flush_buffer();
}
console.log(`Total inserido: ${this.totalInserted} linhas`);
callback();
} catch (err) {
callback(err);
}
}
async _flush_buffer() {
const batch = this.buffer.splice(0, this.batchSize);
const columns = Object.keys(batch[0]);
const values = batch.map((row, i) =>
`(${columns.map((_, j) => `$${i * columns.length + j + 1}`).join(',')})`
).join(',');
const params = batch.flatMap(row => columns.map(col => row[col]));
await this.db.query(
`INSERT INTO ${this.table} (${columns.join(',')}) VALUES ${values}`,
params
);
this.totalInserted += batch.length;
}
}
Object Mode: Streams de Objetos
Por padrão, streams operam com Buffer (dados binários) ou string. No object mode, cada chunk é um objeto JavaScript arbitrário. O highWaterMark passa a contar número de objetos em vez de bytes.
const { Transform } = require('node:stream');
// Pipeline completa: CSV -> JSON objects -> filtragem -> batch insert
const { pipeline } = require('node:stream/promises');
await pipeline(
fs.createReadStream('users-10M.csv'), // Readable (bytes)
new CsvToJsonTransform(), // Transform (bytes -> objects)
new Transform({ // Transform (filter)
objectMode: true,
transform(user, enc, cb) {
// Filtra apenas usuários ativos com email válido
if (user.status === 'active' && user.email?.includes('@')) {
this.push(user);
}
cb();
},
}),
new Transform({ // Transform (enrich)
objectMode: true,
transform(user, enc, cb) {
user.imported_at = new Date().toISOString();
user.email = user.email.toLowerCase().trim();
this.push(user);
cb();
},
}),
new BatchInsertStream(pool, 'users', { batchSize: 5000 }), // Writable (objects)
);
console.log('10 milhões de usuários importados com ~64KB de memória');
Web Streams API: Compatibilidade com Browsers
A partir do Node.js 18+, a Web Streams API (WHATWG Streams) está disponível globalmente. Essa é a mesma API que os browsers implementam. Útil para código isomórfico e integração com APIs modernas como fetch().
// Web Streams — funciona identicamente no Node.js e no browser
const response = await fetch('https://api.example.com/large-dataset.json');
// response.body é um ReadableStream (Web API)
const reader = response.body.getReader();
const decoder = new TextDecoder();
let totalBytes = 0;
while (true) {
const { done, value } = await reader.read();
if (done) break;
totalBytes += value.byteLength;
process.stdout.write(`\rBaixado: ${(totalBytes / 1024 / 1024).toFixed(1)} MB`);
}
// Conversão entre Node Streams e Web Streams
const { Readable } = require('node:stream');
// Node Readable -> Web ReadableStream
const nodeStream = fs.createReadStream('file.txt');
const webStream = Readable.toWeb(nodeStream);
// Web ReadableStream -> Node Readable
const backToNode = Readable.fromWeb(webStream);
// TransformStream (Web API) — equivalente ao Transform do Node
const uppercaseTransform = new TransformStream({
transform(chunk, controller) {
const text = new TextDecoder().decode(chunk);
const upper = new TextEncoder().encode(text.toUpperCase());
controller.enqueue(upper);
},
});
// Piping com Web Streams
const readable = (await fetch('https://example.com/text.txt')).body;
const writable = new WritableStream({
write(chunk) {
console.log('Chunk recebido:', new TextDecoder().decode(chunk));
},
});
await readable.pipeThrough(uppercaseTransform).pipeTo(writable);
Async Iterators: A Interface Moderna
A partir do Node.js 10+, toda Readable stream implementa o protocolo Symbol.asyncIterator, permitindo o uso de for await...of. Essa é a forma mais limpa e legível de consumir streams.
const fs = require('node:fs');
const readline = require('node:readline');
// Processar arquivo linha por linha com async iterator
const fileStream = fs.createReadStream('access.log');
const rl = readline.createInterface({ input: fileStream, crlfDelay: Infinity });
const statusCounts = { '2xx': 0, '3xx': 0, '4xx': 0, '5xx': 0 };
for await (const line of rl) {
const match = line.match(/HTTP\/\d\.\d" (\d)\d{2}/);
if (match) {
statusCounts[`${match[1]}xx`]++;
}
}
console.log('Distribuição de status codes:', statusCounts);
Generators como Readable Streams
Async generators podem ser convertidos em Readable streams, criando fontes de dados elegantes:
const { Readable } = require('node:stream');
const { pipeline } = require('node:stream/promises');
// Generator que produz dados paginados de uma API
async function* fetchAllPages(baseUrl) {
let page = 1;
let hasMore = true;
while (hasMore) {
const response = await fetch(`${baseUrl}?page=${page}&limit=100`);
const data = await response.json();
for (const item of data.results) {
yield item; // Cada item vira um chunk do stream
}
hasMore = data.hasNextPage;
page++;
}
}
// Usar o generator como stream numa pipeline
await pipeline(
Readable.from(fetchAllPages('https://api.example.com/products')),
new Transform({
objectMode: true,
transform(product, enc, cb) {
// Normaliza preço para centavos
product.price_cents = Math.round(product.price * 100);
delete product.price;
this.push(JSON.stringify(product) + '\n');
cb();
},
}),
fs.createWriteStream('products-normalized.jsonl'),
);
Aplicações Práticas
Upload Multipart com Streaming
const http = require('node:http');
const { pipeline } = require('node:stream/promises');
const { createHash } = require('node:crypto');
const server = http.createServer(async (req, res) => {
if (req.method === 'PUT' && req.url.startsWith('/upload/')) {
const filename = req.url.split('/').pop();
const expectedSize = parseInt(req.headers['content-length'], 10);
// Hash do arquivo em tempo real durante o upload (sem buffer intermediário)
const hash = createHash('sha256');
let receivedBytes = 0;
const progressTransform = new Transform({
transform(chunk, enc, cb) {
receivedBytes += chunk.length;
hash.update(chunk);
// Log de progresso a cada 10MB
if (receivedBytes % (10 * 1024 * 1024) < chunk.length) {
const pct = ((receivedBytes / expectedSize) * 100).toFixed(1);
console.log(`Upload ${filename}: ${pct}%`);
}
cb(null, chunk); // Passa o chunk adiante sem modificar
},
});
try {
await pipeline(
req, // Readable: corpo da request HTTP
progressTransform, // Transform: progresso + hash
fs.createWriteStream(`/uploads/${filename}`), // Writable: arquivo no disco
);
const checksum = hash.digest('hex');
res.writeHead(201, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ filename, size: receivedBytes, sha256: checksum }));
} catch (err) {
// Cleanup: remover arquivo parcial
fs.unlink(`/uploads/${filename}`, () => {});
res.writeHead(500);
res.end(JSON.stringify({ error: err.message }));
}
}
});
server.listen(3000);
ETL Pipeline: CSV para PostgreSQL
const { pipeline } = require('node:stream/promises');
const { Transform } = require('node:stream');
const { createReadStream } = require('node:fs');
const { createGunzip } = require('node:zlib');
const { parse } = require('csv-parse'); // npm install csv-parse
async function etlPipeline(inputFile, db) {
let processed = 0;
let errors = 0;
const startTime = Date.now();
const validate = new Transform({
objectMode: true,
transform(record, enc, cb) {
// Validação e limpeza
if (!record.email || !record.email.includes('@')) {
errors++;
return cb(); // Descarta registros inválidos (não propaga erro)
}
this.push({
email: record.email.toLowerCase().trim(),
name: record.name?.trim() || null,
amount: parseFloat(record.amount) || 0,
created_at: new Date(record.date).toISOString(),
});
processed++;
if (processed % 100_000 === 0) {
const elapsed = ((Date.now() - startTime) / 1000).toFixed(1);
const rate = Math.round(processed / parseFloat(elapsed));
console.log(`Processados: ${processed.toLocaleString()} | Erros: ${errors} | ${rate} registros/s`);
}
cb();
},
});
await pipeline(
createReadStream(inputFile), // Lê o arquivo .csv.gz
createGunzip(), // Descomprime gzip on-the-fly
parse({ // Parse CSV -> objetos
columns: true, // Usa a primeira linha como headers
skip_empty_lines: true,
cast: false, // Não fazer cast automático
relax_column_count: true,
}),
validate, // Validação e limpeza
new BatchInsertStream(db, 'transactions', { batchSize: 5000 }),
);
const totalTime = ((Date.now() - startTime) / 1000).toFixed(1);
console.log(`\nETL concluído: ${processed.toLocaleString()} registros em ${totalTime}s | ${errors} erros`);
}
// Uso: processar arquivo de 50GB comprimido com ~100MB de memória
await etlPipeline('transactions-2024.csv.gz', pool);
HTTP Response Streaming (Server-Sent Events)
const http = require('node:http');
const server = http.createServer((req, res) => {
if (req.url === '/events') {
// Server-Sent Events — streaming de dados do servidor para o cliente
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
});
// Envia heartbeat a cada 15s para manter a conexão viva
const heartbeat = setInterval(() => {
res.write(':heartbeat\n\n'); // Comentário SSE — ignorado pelo cliente
}, 15000);
// Simula stream de dados em tempo real
let eventId = 0;
const dataInterval = setInterval(() => {
const event = {
id: ++eventId,
timestamp: Date.now(),
cpu: (Math.random() * 100).toFixed(1),
memory: process.memoryUsage().heapUsed,
};
res.write(`id: ${event.id}\n`);
res.write(`event: metrics\n`);
res.write(`data: ${JSON.stringify(event)}\n\n`);
}, 1000);
// Cleanup quando o cliente desconecta
req.on('close', () => {
clearInterval(heartbeat);
clearInterval(dataInterval);
console.log('Cliente desconectou do SSE');
});
}
});
server.listen(3000);
Performance: Streaming vs Buffering
A diferença de consumo de memória entre buffering e streaming é dramática e cresce linearmente com o tamanho do input:
const { performance } = require('node:perf_hooks');
// Benchmark: processar arquivo de N tamanho
async function benchmarkBuffered(filepath) {
const start = performance.now();
const memBefore = process.memoryUsage().heapUsed;
const data = require('node:fs').readFileSync(filepath);
// Simula processamento: contar linhas
const lines = data.toString().split('\n').length;
const memAfter = process.memoryUsage().heapUsed;
const duration = performance.now() - start;
return {
mode: 'buffered',
lines,
duration: `${duration.toFixed(0)}ms`,
memoryDelta: `${((memAfter - memBefore) / 1024 / 1024).toFixed(1)}MB`,
};
}
async function benchmarkStreamed(filepath) {
const start = performance.now();
const memBefore = process.memoryUsage().heapUsed;
let lines = 0;
const rl = require('node:readline').createInterface({
input: require('node:fs').createReadStream(filepath),
crlfDelay: Infinity,
});
for await (const _ of rl) {
lines++;
}
const memAfter = process.memoryUsage().heapUsed;
const duration = performance.now() - start;
return {
mode: 'streamed',
lines,
duration: `${duration.toFixed(0)}ms`,
memoryDelta: `${((memAfter - memBefore) / 1024 / 1024).toFixed(1)}MB`,
};
}
// Resultados típicos para arquivo de 1GB:
// buffered: { duration: '3200ms', memoryDelta: '1024.0MB' }
// streamed: { duration: '2800ms', memoryDelta: '2.1MB' }
//
// Para 5GB (com --max-old-space-size=8192):
// buffered: { duration: '18000ms', memoryDelta: '5120.0MB' }
// streamed: { duration: '14000ms', memoryDelta: '2.3MB' } <-- memória CONSTANTE
Resumo de Decisão
Pergunta: "Devo usar streams ou carregar tudo na memória?"
1. O tamanho do input é previsível e pequeno (< 50MB)?
→ Buffering é aceitável. Mais simples de debugar.
2. O input pode ser arbitrariamente grande? (upload, arquivo, query de DB)
→ SEMPRE use streams. Sem exceção.
3. Preciso de transformações encadeadas? (parse -> validate -> enrich -> save)
→ pipeline() com Transform streams
4. Preciso de compatibilidade Node.js + browser?
→ Web Streams API (ReadableStream, TransformStream)
5. Quero código legível sem callbacks?
→ for await...of com async iterators
6. .pipe() ou pipeline()?
→ SEMPRE pipeline(). Sem exceção.
→ .pipe() não faz cleanup em caso de erro = leak de memória/file descriptors
Referencias e Fontes
- Node.js Streams Documentation — https://nodejs.org/api/stream.html — Documentacao oficial sobre Readable, Writable, Transform e pipeline
- “Node.js Design Patterns” — Mario Casciaro, Luciano Mammino — Cobertura detalhada de Streams, backpressure e patterns de composicao