Skip to content

Core 06 streams

Core 06 - Streams

Basic Idea

Files bigger than available RAM shouldn't crash your app Streams process data in chunks - one piece at a time , memory stays flat Node's stream API is the swiss army knife for any I/O pipeline

Stream Types - The Four Flavors

// Readable - source of data
const { Readable, Writable, Transform, Duplex, pipeline } = require('stream')

// Readable: you pull data from it (file read, HTTP response)
// Writable: you push data into it (file write, HTTP request)
// Transform: reads input, transforms, writes output (gzip, encrypt)
// Duplex: both readable and writable (TCP socket)

Every stream in Node is one of these four types If you understand these , you understand how half of Node works

pipe() vs pipeline()

const fs = require('fs')

// pipe() - classic but broken
const readStream = fs.createReadStream('input.txt')
const writeStream = fs.createWriteStream('output.txt')
readStream.pipe(writeStream)
// if readStream errors, writeStream stays open - memory leak!
// if writeStream errors, readStream stays open - resource leak!

// pipeline() - the correct way
const { pipeline } = require('stream/promises')

async function copyFile() {
  try {
    await pipeline(
      fs.createReadStream('input.txt'),
      fs.createWriteStream('output.txt')
    )
    console.log('file copied cleanly')
  } catch (err) {
    console.error('copy failed:', err.message)
    // both streams are automatically destroyed
  }
}

pipe() was the original API - it's short but doesn't handle errors When a pipe chain breaks , the upstream or downstream streams might leak pipeline() destroys all streams on error - this is the only correct choice in 2025+

Stream Modes - Flowing vs Paused

// PAUSED mode - default. You call read()
const stream = fs.createReadStream('file.txt')
let chunk
while ((chunk = stream.read()) !== null) {
  console.log('read', chunk.length, 'bytes')
}
// stream.resume() switches to flowing mode
// stream.pause() switches back to paused

// FLOWING mode - data events fire automatically  
const stream2 = fs.createReadStream('file.txt')
stream2.on('data', (chunk) => {
  console.log('received', chunk.length, 'bytes')
  stream2.pause() // manually pause backpressure
  setTimeout(() => stream2.resume(), 100)
})

In paused mode , you control when to read In flowing mode , data flies at you as fast as the source produces it Adding a data listener switches to flowing mode automatically Removing the data listener switches back to paused

Backpressure - The Real Problem

const { pipeline } = require('stream/promises')
const fs = require('fs')

// slow writable simulates backpressure
const slowWrite = new Writable({
  highWaterMark: 16 * 1024, // 16KB internal buffer
  async write(chunk, encoding, callback) {
    // simulate slow disk
    await new Promise(r => setTimeout(r, 10))
    callback()
  }
})

// fast readable pushes data
const fastRead = fs.createReadStream('large-file.bin', {
  highWaterMark: 64 * 1024 // 64KB chunks
})

// if readable outruns writable - internal buffer fills up
// pipe/pipeline handle this automatically

// manual approach - check drain
const writable = fs.createWriteStream('output.bin')
writable.on('drain', () => {
  console.log('buffer drained - safe to write more')
})

highWaterMark sets the internal buffer size When the writable's buffer exceeds highWaterMark , write() returns false That's the signal to stop pushing data until the 'drain' event fires pipe() and pipeline() handle backpressure automatically - one of the main reasons to use them

Creating Custom Transforms

const { Transform } = require('stream')

// uppercase transform - silly example but shows the pattern
class UpperCaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    // chunk is a Buffer - convert to string , transform , push
    const upper = chunk.toString().toUpperCase()
    this.push(upper)
    callback()
  }

  _flush(callback) {
    // called before stream ends - for final data
    this.push('\n--- TRANSFORM COMPLETE ---\n')
    callback()
  }
}

const { pipeline } = require('stream/promises')
const fs = require('fs')

async function upperCaseFile() {
  await pipeline(
    fs.createReadStream('input.txt'),
    new UpperCaseTransform(),
    fs.createWriteStream('output.txt')
  )
  console.log('file uppercased')
}

_transform(chunk, encoding, callback) receives each chunk Call this.push(data) to pass transformed data downstream Call callback() when ready for the next chunk _flush(callback) fires before the stream ends - for final cleanup or trailing data

Real Example: File Encrypt/Decrypt Pipeline

const { Transform } = require('stream')
const { pipeline } = require('stream/promises')
const fs = require('fs')
const crypto = require('crypto')

const ALGORITHM = 'aes-256-cbc'
const KEY = crypto.randomBytes(32)
const IV = crypto.randomBytes(16)

class EncryptStream extends Transform {
  constructor(key, iv) {
    super()
    this.cipher = crypto.createCipheriv(ALGORITHM, key, iv)
  }

  _transform(chunk, encoding, callback) {
    try {
      const encrypted = this.cipher.update(chunk)
      this.push(encrypted)
      callback()
    } catch (err) {
      callback(err)
    }
  }

  _flush(callback) {
    try {
      const final = this.cipher.final()
      this.push(final)
      callback()
    } catch (err) {
      callback(err)
    }
  }
}

class DecryptStream extends Transform {
  constructor(key, iv) {
    super()
    this.decipher = crypto.createDecipheriv(ALGORITHM, key, iv)
  }

  _transform(chunk, encoding, callback) {
    try {
      const decrypted = this.decipher.update(chunk)
      this.push(decrypted)
      callback()
    } catch (err) {
      callback(err)
    }
  }

  _flush(callback) {
    try {
      const final = this.decipher.final()
      this.push(final)
      callback()
    } catch (err) {
      callback(err)
    }
  }
}

async function encryptFile(input, output) {
  await pipeline(
    fs.createReadStream(input),
    new EncryptStream(KEY, IV),
    fs.createWriteStream(output)
  )
}

async function decryptFile(input, output) {
  await pipeline(
    fs.createReadStream(input),
    new DecryptStream(KEY, IV),
    fs.createWriteStream(output)
  )
}

await encryptFile('secret.txt', 'secret.enc')
await decryptFile('secret.enc', 'secret_decrypted.txt')

Security: DoS via Stream Bomb

// DANGER - decompression bomb / infinite stream
const http = require('http')
const fs = require('fs')

http.createServer((req, res) => {
  const ws = fs.createWriteStream('/tmp/upload.bin')
  req.pipe(ws)
  // attacker sends infinite data - fills disk - DoS
}).listen(3000)

// DEFENSE - size limits in the middleware layer
http.createServer((req, res) => {
  let total = 0
  const MAX_SIZE = 100 * 1024 * 1024 // 100MB

  req.on('data', (chunk) => {
    total += chunk.length
    if (total > MAX_SIZE) {
      req.destroy(new Error('upload too large'))
    }
  })
  // then pipe only if valid
}).listen(3000)

Stream bombs come in two flavors: infinite data (unbounded uploads) and zip bombs (tiny input , massive output) Without size limits , an attacker fills disk or memory until the process dies Always set highWaterMark and enforce total size limits at the application boundary

Another attack: stream that produces data faster than the consumer can process If you skip backpressure handling , memory grows unbounded until the process OOMs

Summary

  • Use pipeline() not pipe() - error cleanup is non-negotiable
  • Backpressure prevents OOM - let streams handle it automatically
  • Transform for processing data between read and write
  • Always enforce size limits in production - stream bombs are real
  • Four types: Readable , Writable , Transform , Duplex

Prerequisites

next -> core_07_zlib.md