Skip to content

Message Queues

When your API starts handling requests that take 30 seconds (video transcoding , PDF generation , bulk email sending) , you can't keep the HTTP connection open that long — the client times out , the load balancer kills the connection , and your user stares at a loading spinner until they rage-quit Message queues fix this: you accept the request , put a job on the queue , return "202 Accepted" immediately , and a worker processes the job asynchronously. The user gets a response in milliseconds , the heavy work happens in the background

Why Queues Matter

flowchart TD
    subgraph Without_Queue[Without Queue]
        C1[Client] --> POST1[POST /api/report]
        POST1 --> API[API processes for 60s]
        API --> R1[Response]
        POST1 -.->|Client timeout| TO[Too late]
    end

    subgraph With_Queue[With Queue]
        C2[Client] --> POST2[POST /api/report]
        POST2 --> Queue[API puts job on queue]
        Queue --> Accepted[202 Accepted]
        Queue --> Worker[Worker picks up job]
        Worker --> Report[Generates report - 60s]
        Report --> Store[Stores result, notifies client]
    end

RabbitMQ — The Industry Standard

RabbitMQ is a battle-tested message broker that handles millions of messages per day

# Install with Docker
docker run -d --name rabbitmq \
  -p 5672:5672 -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=admin \
  rabbitmq:4-management

# Management UI at http://localhost:15672
const amqp = require('amqplib')

async function setup() {
  const connection = await amqp.connect('amqp://admin:admin@localhost:5672')

  // Channel — most operations happen on channels
  const channel = await connection.createChannel()

  // Declare queue (idempotent — creates if doesn't exist)
  await channel.assertQueue('report-generation', {
    durable: true     // Queue survives broker restart
  })

  // Prefetch — don't give worker more than 3 jobs at once
  channel.prefetch(3)

  return { connection, channel }
}

// Publisher — API server puts jobs on the queue
async function publishJob(channel, jobData) {
  channel.sendToQueue('report-generation', Buffer.from(JSON.stringify(jobData)), {
    persistent: true,         // Message survives broker restart
    contentType: 'application/json',
    timestamp: Date.now()
  })

  console.log('Published job:', jobData.id)
}

// Consumer — worker processes jobs from the queue
async function startWorker(channel) {
  channel.consume('report-generation', async (msg) => {
    if (!msg) return

    const job = JSON.parse(msg.content.toString())
    console.log('Processing job:', job.id)

    try {
      await generateReport(job)

      // Acknowledge — remove from queue
      channel.ack(msg)
      console.log('Job completed:', job.id)
    } catch (err) {
      console.error('Job failed:', job.id, err.message)

      // Reject — optionally requeue for retry
      channel.nack(msg, false, !job.retryCount || job.retryCount < 3)
      // false = don't requeue multiple messages
      // true = requeue if retryCount < 3 . otherwise dead letter
    }
  })

  console.log('Worker started, waiting for jobs...')
}

Bull — Redis-Backed Queues for Node.js

Bull is the go-to queue library for Node.js when you're already running Redis

npm install bull
const Queue = require('bull')

// Create a queue
const reportQueue = new Queue('report-generation', {
  redis: {
    host: 'localhost',
    port: 6379,
    password: process.env.REDIS_PASSWORD
  },
  defaultJobOptions: {
    attempts: 3,                    // Retry up to 3 times
    backoff: {
      type: 'exponential',          // Wait longer between retries
      delay: 2000                   // Start with 2 seconds
    },
    removeOnComplete: 100,          // Keep last 100 completed jobs
    removeOnFail: 50                // Keep last 50 failed jobs
  }
})

// Producer — add jobs to the queue
app.post('/api/reports', async (req, res) => {
  const job = await reportQueue.add({
    userId: req.user.id,
    reportType: req.body.type,
    parameters: req.body.parameters,
    createdAt: Date.now()
  }, {
    jobId: `${req.user.id}-${Date.now()}`,    // Deduplication
    priority: req.user.isPremium ? 1 : 10,    // Premium users go first
    delay: req.body.scheduleAt ? new Date(req.body.scheduleAt) - Date.now() : 0
  })

  res.status(202).json({
    jobId: job.id,
    status: 'queued'
  })
})

// Consumer — process jobs
reportQueue.process(async (job) => {
  const { userId, reportType, parameters } = job.data

  // Progress reporting (0-100)
  job.progress(10)

  const data = await fetchReportData(userId, parameters)
  job.progress(50)

  const report = await generateReport(reportType, data)
  job.progress(90)

  await storeReport(userId, report)
  job.progress(100)

  return { reportId: report.id, url: report.url }
})

// Event handlers
reportQueue.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed: ${result.reportId}`)
  // Notify user that report is ready
  notifyUser(job.data.userId, 'report.ready', { reportId: result.reportId })
})

reportQueue.on('failed', (job, err) => {
  console.error(`Job ${job.id} failed after ${job.attemptsMade} attempts:`, err.message)
})

reportQueue.on('stalled', (job) => {
  console.warn(`Job ${job.id} stalled — worker may have crashed`)
})

Job Scheduling — Delayed and Recurring Jobs

// Delayed job — run after specific delay
await reportQueue.add(data, {
  delay: 3600000    // Run in 1 hour
})

// Schedule for specific time
const scheduledTime = new Date('2026-06-06T10:00:00Z')
await reportQueue.add(data, {
  delay: scheduledTime.getTime() - Date.now()
})

// Recurring jobs (Bull repeatable jobs)
const dailyCleanup = new Queue('cleanup', redisConfig)

// Run every day at midnight
await dailyCleanup.add({ task: 'cleanup-logs' }, {
  repeat: {
    cron: '0 0 * * *',
    tz: 'America/New_York'
  }
})

Queue Monitoring — Know What's Happening

Bull Arena (web UI for Bull queues):

const Arena = require('bull-arena')

Arena({
  queues: [
    {
      name: 'report-generation',
      hostId: 'Redis',
      redis: { host: 'localhost', port: 6379 }
    },
    {
      name: 'email-sending',
      hostId: 'Redis',
      redis: { host: 'localhost', port: 6379 }
    }
  ]
})

Metrics to watch: * Queue depth — how many jobs are waiting? Growing = workers can't keep up * Active jobs — how many are processing right now? * Failed jobs — rate of failures , types of failures * Job duration — how long does each job take? Any outliers? * Stalled jobs — workers crashing? Jobs picked up but never completed?

Backpressure — When Workers Can't Keep Up

Backpressure happens when producers push jobs faster than consumers can process them

Symptoms: * Queue depth grows continuously * Job latency increases (jobs wait longer before processing) * Memory usage on Redis/broker increases * Eventually the broker runs out of memory or hits storage limits

Solutions: * Add more workers (horizontal scaling) * Rate limit producers * Drop lower-priority jobs * Use a dead letter queue for jobs that can't be processed

// Limit queue size to prevent unbounded growth
const queue = new Queue('reports', {
  limiter: {
    max: 100,     // Max jobs per duration
    duration: 1000 // Per second
  }
})

// Or use Redis monitoring to alert
setInterval(async () => {
  const count = await queue.getJobCounts()

  if (count.waiting > 1000) {
    console.error('Queue backlog critical:', count.waiting)
    alertOperations('Queue backlog critical')
  }
}, 30000)

Security — Don't Let Anyone Publish to Your Queue

Queue authentication:

# RabbitMQ with TLS + credentials
services:
  rabbitmq:
    image: rabbitmq:4-management
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD}
    ports:
      - "5671:5671"    # TLS port
    volumes:
      - ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf

# RabbitMQ config for TLS
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true
ssl_options.cacertfile = /etc/rabbitmq/ca.pem
ssl_options.certfile = /etc/rabbitmq/server.pem
ssl_options.keyfile = /etc/rabbitmq/key.pem

Queue ACLs (who can read/write):

rabbitmqctl set_permissions -p / producer "^reports-queue$" "" "^amq\.default"
rabbitmqctl set_permissions -p / consumer "" "^reports-queue$" ""

Never expose queue ports to the internet: * RabbitMQ management port 15672 should be internal only * Redis port 6379 should never be publicly accessible * Queue connections should use TLS in production


next → upcoming_06_whats_next.md