Skip to content

Advanced Stream Patterns

You know basic streams - pipe , pipeline , read , write Now we go deeper: object mode , custom protocol streams , backpressure internals , and the attacks that exploit stream misunderstandings This is where streams become weapons instead of utilities

Object Mode Streams

Regular streams pass Buffer or string chunks Object mode streams pass JavaScript objects - one object per push() call

const { Readable, Writable, Transform } = require('stream')

// Object mode readable - emits user objects
class UserReader extends Readable {
  constructor(users) {
    super({ objectMode: true })
    this.users = users
    this.index = 0
  }

  _read() {
    if (this.index >= this.users.length) {
      this.push(null) // signal end
      return
    }
    this.push(this.users[this.index])
    this.index++
  }
}

// Object mode transform - modifies each object
class UserEnricher extends Transform {
  constructor() {
    super({ objectMode: true })
  }

  _transform(user, encoding, callback) {
    user.enriched = true
    user.timestamp = Date.now()
    user.role = user.role || 'user'
    this.push(user)
    callback()
  }
}

// Object mode writable - processes objects
class UserWriter extends Writable {
  constructor() {
    super({ objectMode: true })
    this.processed = []
  }

  _write(user, encoding, callback) {
    this.processed.push(user)
    console.log(`Processed: ${user.name} (${user.role})`)
    callback()
  }
}

// Pipe object-mode streams
const reader = new UserReader([
  { name: 'Mahmoud', role: 'admin' },
  { name: 'Ali', role: 'user' },
  { name: 'Khaled' }
])

const enricher = new UserEnricher()
const writer = new UserWriter()

reader.pipe(enricher).pipe(writer)

writer.on('finish', () => {
  console.log('All objects processed:', writer.processed.length)
})

Object mode streams are perfect for CSV parsing , log processing , database row streaming highWaterMark in object mode counts objects , not bytes (default 16 objects)

Stream Composition - pipeline

pipeline chains multiple streams and handles cleanup on error

const { pipeline } = require('stream/promises')
const fs = require('fs')
const zlib = require('zlib')
const crypto = require('crypto')

async function compressAndEncrypt(input, output, password) {
  const key = crypto.scryptSync(password, 'salt', 32)
  const iv = crypto.randomBytes(16)

  // Write IV at start of output
  await fs.promises.writeFile(output, iv)

  // Append encrypted + compressed data
  await pipeline(
    fs.createReadStream(input),
    zlib.createGzip(),
    crypto.createCipheriv('aes-256-cbc', key, iv),
    fs.createWriteStream(output, { flags: 'a' })
  )
}

async function decryptAndDecompress(input, output, password) {
  const key = crypto.scryptSync(password, 'salt', 32)

  // Read IV from start of file
  const iv = await fs.promises.readFile(input, { length: 16 })

  await pipeline(
    fs.createReadStream(input, { start: 16 }),
    crypto.createDecipheriv('aes-256-cbc', key, iv),
    zlib.createGunzip(),
    fs.createWriteStream(output)
  )
}

await compressAndEncrypt('data.txt', 'data.enc', 'hunter2')
await decryptAndDecompress('data.enc', 'data.out', 'hunter2')

pipeline vs pump: - pipeline - built into Node (stream/promises) , preferred - pump - npm package , works with older Node versions - Both handle cleanup , but pipeline is native

Backpressure Deep Dive

Backpressure is the mechanism that prevents the producer from overwhelming the consumer

const { Writable } = require('stream')

const writable = new Writable({
  highWaterMark: 16384, // 16KB internal buffer

  write(chunk, encoding, callback) {
    // Simulate slow consumer (e.g., network, disk)
    setTimeout(() => {
      console.log(`Wrote ${chunk.length} bytes`)
      callback()
    }, 100)
  }
})

// Track backpressure state
writable.on('drain', () => {
  console.log('DRAIN - buffer cleared, safe to write again')
})

// Fast producer
let written = 0
function produce() {
  let canWrite = true

  while (canWrite && written < 1000000) {
    const chunk = Buffer.alloc(8192) // 8KB
    written += chunk.length

    canWrite = writable.write(chunk)

    if (!canWrite) {
      console.log(`Backpressure at ${written} bytes - pausing`)
      // wait for 'drain' before resuming
    }
  }
}

produce()

When writable.write() returns false , the internal buffer is full Don't write more until the 'drain' event fires

highWaterMark internals: - Default: 16KB for buffers , 16 objects for objectMode - When buffer exceeds highWaterMark , write() returns false - When buffer drains below highWaterMark , 'drain' fires - You can tune it: higher = more memory but less backpressure overhead

Custom Duplex/Transform for Real Protocols

Building a stream that parses a binary protocol frame-by-frame

const { Transform } = require('stream')

// Binary protocol: [length:4 bytes][payload:N bytes]
class FrameParser extends Transform {
  constructor() {
    super({ objectMode: true })
    this.buffer = Buffer.alloc(0)
    this.frameLength = -1
  }

  _transform(chunk, encoding, callback) {
    // Append incoming data to internal buffer
    this.buffer = Buffer.concat([this.buffer, chunk])

    // Try to extract frames
    while (true) {
      if (this.frameLength === -1) {
        // Need at least 4 bytes for length header
        if (this.buffer.length < 4) break

        this.frameLength = this.buffer.readUInt32BE(0)
        this.buffer = this.buffer.slice(4)
      }

      // Do we have a complete frame?
      if (this.buffer.length < this.frameLength) break

      // Extract the frame
      const frame = this.buffer.slice(0, this.frameLength)
      this.buffer = this.buffer.slice(this.frameLength)
      this.frameLength = -1

      // Push parsed frame as object
      this.push({
        timestamp: Date.now(),
        length: frame.length,
        data: frame
      })
    }

    callback()
  }

  _flush(callback) {
    if (this.buffer.length > 0) {
      // Incomplete frame at end - protocol error
      this.destroy(new Error(`Incomplete frame: ${this.buffer.length} leftover bytes`))
    }
    callback()
  }
}

// Usage
const parser = new FrameParser()
parser.on('data', (frame) => {
  console.log(`Frame: ${frame.length} bytes at ${frame.timestamp}`)
})

// Simulate fragmented network data
parser.write(Buffer.from([0x00, 0x00, 0x00, 0x05])) // length header
parser.write(Buffer.from('Hel'))                       // partial payload
parser.write(Buffer.from('lo'))                        // rest of payload

Error Handling in Streams

const { pipeline } = require('stream/promises')
const fs = require('fs')
const zlib = require('zlib')

async function safePipeline(input, output) {
  try {
    await pipeline(
      fs.createReadStream(input),
      zlib.createGzip(),
      fs.createWriteStream(output)
    )
    console.log('Pipeline completed cleanly')
  } catch (err) {
    // pipeline destroys ALL streams on error
    console.error('Pipeline failed:', err.message)
    // All resources are released - no cleanup needed
  }
}

pipeline handles: stream errors , premature destroy , aborted signals Without pipeline: dangling streams , memory leaks , file handle leaks

// Manual error handling (if you can't use pipeline)
const readStream = fs.createReadStream('input.txt')
const writeStream = fs.createWriteStream('output.txt')

let cleanup = false
function onError(err) {
  if (cleanup) return
  cleanup = true
  readStream.destroy(err)
  writeStream.destroy(err)
  console.error('Stream error:', err.message)
}

readStream.on('error', onError)
writeStream.on('error', onError)
readStream.pipe(writeStream)

Security - DoS via Slow Loris on Streams

// VULNERABLE - unbounded stream accumulation
const http = require('http')
const { Transform } = require('stream')

http.createServer((req, res) => {
  const parser = new FrameParser()
  req.pipe(parser)

  let frames = []
  parser.on('data', (frame) => {
    frames.push(frame) // memory grows unbounded!
  })

  parser.on('end', () => {
    res.end(`Received ${frames.length} frames`)
  })
}).listen(3000)

Slow loris sends data byte-by-byte - each chunk arrives as a separate TCP packet If your parser holds data in memory waiting for complete frames , memory grows unbounded

// DEFENSE - enforce size limits
class SafeFrameParser extends Transform {
  constructor(maxFrameSize = 10 * 1024 * 1024) { // 10MB max
    super({ objectMode: true })
    this.buffer = Buffer.alloc(0)
    this.maxFrameSize = maxFrameSize
  }

  _transform(chunk, encoding, callback) {
    this.buffer = Buffer.concat([this.buffer, chunk])

    if (this.buffer.length > this.maxFrameSize) {
      return callback(new Error('Frame too large - possible DoS'))
    }

    // ... parsing logic ...
    callback()
  }
}

Backpressure as DoS mitigation: When consumer is slow , backpressure propagates to the TCP socket TCP buffer fills up , kernel slows the sender , the attacker's data rate drops This only works if you never buffer more than highWaterMark

Prerequisites


next -> ref_01_events_api.md