Core 06 streams
Core 06 - Streams¶
Basic Idea¶
Files bigger than available RAM shouldn't crash your app Streams process data in chunks - one piece at a time , memory stays flat Node's stream API is the swiss army knife for any I/O pipeline
Stream Types - The Four Flavors¶
// Readable - source of data
const { Readable, Writable, Transform, Duplex, pipeline } = require('stream')
// Readable: you pull data from it (file read, HTTP response)
// Writable: you push data into it (file write, HTTP request)
// Transform: reads input, transforms, writes output (gzip, encrypt)
// Duplex: both readable and writable (TCP socket)
Every stream in Node is one of these four types If you understand these , you understand how half of Node works
pipe() vs pipeline()¶
const fs = require('fs')
// pipe() - classic but broken
const readStream = fs.createReadStream('input.txt')
const writeStream = fs.createWriteStream('output.txt')
readStream.pipe(writeStream)
// if readStream errors, writeStream stays open - memory leak!
// if writeStream errors, readStream stays open - resource leak!
// pipeline() - the correct way
const { pipeline } = require('stream/promises')
async function copyFile() {
try {
await pipeline(
fs.createReadStream('input.txt'),
fs.createWriteStream('output.txt')
)
console.log('file copied cleanly')
} catch (err) {
console.error('copy failed:', err.message)
// both streams are automatically destroyed
}
}
pipe() was the original API - it's short but doesn't handle errors When a pipe chain breaks , the upstream or downstream streams might leak pipeline() destroys all streams on error - this is the only correct choice in 2025+
Stream Modes - Flowing vs Paused¶
// PAUSED mode - default. You call read()
const stream = fs.createReadStream('file.txt')
let chunk
while ((chunk = stream.read()) !== null) {
console.log('read', chunk.length, 'bytes')
}
// stream.resume() switches to flowing mode
// stream.pause() switches back to paused
// FLOWING mode - data events fire automatically
const stream2 = fs.createReadStream('file.txt')
stream2.on('data', (chunk) => {
console.log('received', chunk.length, 'bytes')
stream2.pause() // manually pause backpressure
setTimeout(() => stream2.resume(), 100)
})
In paused mode , you control when to read In flowing mode , data flies at you as fast as the source produces it Adding a data listener switches to flowing mode automatically Removing the data listener switches back to paused
Backpressure - The Real Problem¶
const { pipeline } = require('stream/promises')
const fs = require('fs')
// slow writable simulates backpressure
const slowWrite = new Writable({
highWaterMark: 16 * 1024, // 16KB internal buffer
async write(chunk, encoding, callback) {
// simulate slow disk
await new Promise(r => setTimeout(r, 10))
callback()
}
})
// fast readable pushes data
const fastRead = fs.createReadStream('large-file.bin', {
highWaterMark: 64 * 1024 // 64KB chunks
})
// if readable outruns writable - internal buffer fills up
// pipe/pipeline handle this automatically
// manual approach - check drain
const writable = fs.createWriteStream('output.bin')
writable.on('drain', () => {
console.log('buffer drained - safe to write more')
})
highWaterMark sets the internal buffer size When the writable's buffer exceeds highWaterMark , write() returns false That's the signal to stop pushing data until the 'drain' event fires pipe() and pipeline() handle backpressure automatically - one of the main reasons to use them
Creating Custom Transforms¶
const { Transform } = require('stream')
// uppercase transform - silly example but shows the pattern
class UpperCaseTransform extends Transform {
_transform(chunk, encoding, callback) {
// chunk is a Buffer - convert to string , transform , push
const upper = chunk.toString().toUpperCase()
this.push(upper)
callback()
}
_flush(callback) {
// called before stream ends - for final data
this.push('\n--- TRANSFORM COMPLETE ---\n')
callback()
}
}
const { pipeline } = require('stream/promises')
const fs = require('fs')
async function upperCaseFile() {
await pipeline(
fs.createReadStream('input.txt'),
new UpperCaseTransform(),
fs.createWriteStream('output.txt')
)
console.log('file uppercased')
}
_transform(chunk, encoding, callback) receives each chunk Call this.push(data) to pass transformed data downstream Call callback() when ready for the next chunk _flush(callback) fires before the stream ends - for final cleanup or trailing data
Real Example: File Encrypt/Decrypt Pipeline¶
const { Transform } = require('stream')
const { pipeline } = require('stream/promises')
const fs = require('fs')
const crypto = require('crypto')
const ALGORITHM = 'aes-256-cbc'
const KEY = crypto.randomBytes(32)
const IV = crypto.randomBytes(16)
class EncryptStream extends Transform {
constructor(key, iv) {
super()
this.cipher = crypto.createCipheriv(ALGORITHM, key, iv)
}
_transform(chunk, encoding, callback) {
try {
const encrypted = this.cipher.update(chunk)
this.push(encrypted)
callback()
} catch (err) {
callback(err)
}
}
_flush(callback) {
try {
const final = this.cipher.final()
this.push(final)
callback()
} catch (err) {
callback(err)
}
}
}
class DecryptStream extends Transform {
constructor(key, iv) {
super()
this.decipher = crypto.createDecipheriv(ALGORITHM, key, iv)
}
_transform(chunk, encoding, callback) {
try {
const decrypted = this.decipher.update(chunk)
this.push(decrypted)
callback()
} catch (err) {
callback(err)
}
}
_flush(callback) {
try {
const final = this.decipher.final()
this.push(final)
callback()
} catch (err) {
callback(err)
}
}
}
async function encryptFile(input, output) {
await pipeline(
fs.createReadStream(input),
new EncryptStream(KEY, IV),
fs.createWriteStream(output)
)
}
async function decryptFile(input, output) {
await pipeline(
fs.createReadStream(input),
new DecryptStream(KEY, IV),
fs.createWriteStream(output)
)
}
await encryptFile('secret.txt', 'secret.enc')
await decryptFile('secret.enc', 'secret_decrypted.txt')
Security: DoS via Stream Bomb¶
// DANGER - decompression bomb / infinite stream
const http = require('http')
const fs = require('fs')
http.createServer((req, res) => {
const ws = fs.createWriteStream('/tmp/upload.bin')
req.pipe(ws)
// attacker sends infinite data - fills disk - DoS
}).listen(3000)
// DEFENSE - size limits in the middleware layer
http.createServer((req, res) => {
let total = 0
const MAX_SIZE = 100 * 1024 * 1024 // 100MB
req.on('data', (chunk) => {
total += chunk.length
if (total > MAX_SIZE) {
req.destroy(new Error('upload too large'))
}
})
// then pipe only if valid
}).listen(3000)
Stream bombs come in two flavors: infinite data (unbounded uploads) and zip bombs (tiny input , massive output) Without size limits , an attacker fills disk or memory until the process dies Always set highWaterMark and enforce total size limits at the application boundary
Another attack: stream that produces data faster than the consumer can process If you skip backpressure handling , memory grows unbounded until the process OOMs
Summary¶
- Use
pipeline()notpipe()- error cleanup is non-negotiable - Backpressure prevents OOM - let streams handle it automatically
Transformfor processing data between read and write- Always enforce size limits in production - stream bombs are real
- Four types: Readable , Writable , Transform , Duplex
Prerequisites¶
next -> core_07_zlib.md