Advanced Stream Patterns¶
You know basic streams - pipe , pipeline , read , write Now we go deeper: object mode , custom protocol streams , backpressure internals , and the attacks that exploit stream misunderstandings This is where streams become weapons instead of utilities
Object Mode Streams¶
Regular streams pass Buffer or string chunks Object mode streams pass JavaScript objects - one object per push() call
const { Readable, Writable, Transform } = require('stream')
// Object mode readable - emits user objects
class UserReader extends Readable {
constructor(users) {
super({ objectMode: true })
this.users = users
this.index = 0
}
_read() {
if (this.index >= this.users.length) {
this.push(null) // signal end
return
}
this.push(this.users[this.index])
this.index++
}
}
// Object mode transform - modifies each object
class UserEnricher extends Transform {
constructor() {
super({ objectMode: true })
}
_transform(user, encoding, callback) {
user.enriched = true
user.timestamp = Date.now()
user.role = user.role || 'user'
this.push(user)
callback()
}
}
// Object mode writable - processes objects
class UserWriter extends Writable {
constructor() {
super({ objectMode: true })
this.processed = []
}
_write(user, encoding, callback) {
this.processed.push(user)
console.log(`Processed: ${user.name} (${user.role})`)
callback()
}
}
// Pipe object-mode streams
const reader = new UserReader([
{ name: 'Mahmoud', role: 'admin' },
{ name: 'Ali', role: 'user' },
{ name: 'Khaled' }
])
const enricher = new UserEnricher()
const writer = new UserWriter()
reader.pipe(enricher).pipe(writer)
writer.on('finish', () => {
console.log('All objects processed:', writer.processed.length)
})
Object mode streams are perfect for CSV parsing , log processing , database row streaming highWaterMark in object mode counts objects , not bytes (default 16 objects)
Stream Composition - pipeline¶
pipeline chains multiple streams and handles cleanup on error
const { pipeline } = require('stream/promises')
const fs = require('fs')
const zlib = require('zlib')
const crypto = require('crypto')
async function compressAndEncrypt(input, output, password) {
const key = crypto.scryptSync(password, 'salt', 32)
const iv = crypto.randomBytes(16)
// Write IV at start of output
await fs.promises.writeFile(output, iv)
// Append encrypted + compressed data
await pipeline(
fs.createReadStream(input),
zlib.createGzip(),
crypto.createCipheriv('aes-256-cbc', key, iv),
fs.createWriteStream(output, { flags: 'a' })
)
}
async function decryptAndDecompress(input, output, password) {
const key = crypto.scryptSync(password, 'salt', 32)
// Read IV from start of file
const iv = await fs.promises.readFile(input, { length: 16 })
await pipeline(
fs.createReadStream(input, { start: 16 }),
crypto.createDecipheriv('aes-256-cbc', key, iv),
zlib.createGunzip(),
fs.createWriteStream(output)
)
}
await compressAndEncrypt('data.txt', 'data.enc', 'hunter2')
await decryptAndDecompress('data.enc', 'data.out', 'hunter2')
pipeline vs pump: - pipeline - built into Node (stream/promises) , preferred - pump - npm package , works with older Node versions - Both handle cleanup , but pipeline is native
Backpressure Deep Dive¶
Backpressure is the mechanism that prevents the producer from overwhelming the consumer
const { Writable } = require('stream')
const writable = new Writable({
highWaterMark: 16384, // 16KB internal buffer
write(chunk, encoding, callback) {
// Simulate slow consumer (e.g., network, disk)
setTimeout(() => {
console.log(`Wrote ${chunk.length} bytes`)
callback()
}, 100)
}
})
// Track backpressure state
writable.on('drain', () => {
console.log('DRAIN - buffer cleared, safe to write again')
})
// Fast producer
let written = 0
function produce() {
let canWrite = true
while (canWrite && written < 1000000) {
const chunk = Buffer.alloc(8192) // 8KB
written += chunk.length
canWrite = writable.write(chunk)
if (!canWrite) {
console.log(`Backpressure at ${written} bytes - pausing`)
// wait for 'drain' before resuming
}
}
}
produce()
When writable.write() returns false , the internal buffer is full Don't write more until the 'drain' event fires
highWaterMark internals: - Default: 16KB for buffers , 16 objects for objectMode - When buffer exceeds highWaterMark , write() returns false - When buffer drains below highWaterMark , 'drain' fires - You can tune it: higher = more memory but less backpressure overhead
Custom Duplex/Transform for Real Protocols¶
Building a stream that parses a binary protocol frame-by-frame
const { Transform } = require('stream')
// Binary protocol: [length:4 bytes][payload:N bytes]
class FrameParser extends Transform {
constructor() {
super({ objectMode: true })
this.buffer = Buffer.alloc(0)
this.frameLength = -1
}
_transform(chunk, encoding, callback) {
// Append incoming data to internal buffer
this.buffer = Buffer.concat([this.buffer, chunk])
// Try to extract frames
while (true) {
if (this.frameLength === -1) {
// Need at least 4 bytes for length header
if (this.buffer.length < 4) break
this.frameLength = this.buffer.readUInt32BE(0)
this.buffer = this.buffer.slice(4)
}
// Do we have a complete frame?
if (this.buffer.length < this.frameLength) break
// Extract the frame
const frame = this.buffer.slice(0, this.frameLength)
this.buffer = this.buffer.slice(this.frameLength)
this.frameLength = -1
// Push parsed frame as object
this.push({
timestamp: Date.now(),
length: frame.length,
data: frame
})
}
callback()
}
_flush(callback) {
if (this.buffer.length > 0) {
// Incomplete frame at end - protocol error
this.destroy(new Error(`Incomplete frame: ${this.buffer.length} leftover bytes`))
}
callback()
}
}
// Usage
const parser = new FrameParser()
parser.on('data', (frame) => {
console.log(`Frame: ${frame.length} bytes at ${frame.timestamp}`)
})
// Simulate fragmented network data
parser.write(Buffer.from([0x00, 0x00, 0x00, 0x05])) // length header
parser.write(Buffer.from('Hel')) // partial payload
parser.write(Buffer.from('lo')) // rest of payload
Error Handling in Streams¶
const { pipeline } = require('stream/promises')
const fs = require('fs')
const zlib = require('zlib')
async function safePipeline(input, output) {
try {
await pipeline(
fs.createReadStream(input),
zlib.createGzip(),
fs.createWriteStream(output)
)
console.log('Pipeline completed cleanly')
} catch (err) {
// pipeline destroys ALL streams on error
console.error('Pipeline failed:', err.message)
// All resources are released - no cleanup needed
}
}
pipeline handles: stream errors , premature destroy , aborted signals Without pipeline: dangling streams , memory leaks , file handle leaks
// Manual error handling (if you can't use pipeline)
const readStream = fs.createReadStream('input.txt')
const writeStream = fs.createWriteStream('output.txt')
let cleanup = false
function onError(err) {
if (cleanup) return
cleanup = true
readStream.destroy(err)
writeStream.destroy(err)
console.error('Stream error:', err.message)
}
readStream.on('error', onError)
writeStream.on('error', onError)
readStream.pipe(writeStream)
Security - DoS via Slow Loris on Streams¶
// VULNERABLE - unbounded stream accumulation
const http = require('http')
const { Transform } = require('stream')
http.createServer((req, res) => {
const parser = new FrameParser()
req.pipe(parser)
let frames = []
parser.on('data', (frame) => {
frames.push(frame) // memory grows unbounded!
})
parser.on('end', () => {
res.end(`Received ${frames.length} frames`)
})
}).listen(3000)
Slow loris sends data byte-by-byte - each chunk arrives as a separate TCP packet If your parser holds data in memory waiting for complete frames , memory grows unbounded
// DEFENSE - enforce size limits
class SafeFrameParser extends Transform {
constructor(maxFrameSize = 10 * 1024 * 1024) { // 10MB max
super({ objectMode: true })
this.buffer = Buffer.alloc(0)
this.maxFrameSize = maxFrameSize
}
_transform(chunk, encoding, callback) {
this.buffer = Buffer.concat([this.buffer, chunk])
if (this.buffer.length > this.maxFrameSize) {
return callback(new Error('Frame too large - possible DoS'))
}
// ... parsing logic ...
callback()
}
}
Backpressure as DoS mitigation: When consumer is slow , backpressure propagates to the TCP socket TCP buffer fills up , kernel slows the sender , the attacker's data rate drops This only works if you never buffer more than highWaterMark
Prerequisites¶
- adv_03_native_addons.md - advanced stream patterns for native addon I/O
next -> ref_01_events_api.md