Message Queues¶
When your API starts handling requests that take 30 seconds (video transcoding , PDF generation , bulk email sending) , you can't keep the HTTP connection open that long — the client times out , the load balancer kills the connection , and your user stares at a loading spinner until they rage-quit Message queues fix this: you accept the request , put a job on the queue , return "202 Accepted" immediately , and a worker processes the job asynchronously. The user gets a response in milliseconds , the heavy work happens in the background
Why Queues Matter¶
flowchart TD
subgraph Without_Queue[Without Queue]
C1[Client] --> POST1[POST /api/report]
POST1 --> API[API processes for 60s]
API --> R1[Response]
POST1 -.->|Client timeout| TO[Too late]
end
subgraph With_Queue[With Queue]
C2[Client] --> POST2[POST /api/report]
POST2 --> Queue[API puts job on queue]
Queue --> Accepted[202 Accepted]
Queue --> Worker[Worker picks up job]
Worker --> Report[Generates report - 60s]
Report --> Store[Stores result, notifies client]
end RabbitMQ — The Industry Standard¶
RabbitMQ is a battle-tested message broker that handles millions of messages per day
# Install with Docker
docker run -d --name rabbitmq \
-p 5672:5672 -p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
rabbitmq:4-management
# Management UI at http://localhost:15672
const amqp = require('amqplib')
async function setup() {
const connection = await amqp.connect('amqp://admin:admin@localhost:5672')
// Channel — most operations happen on channels
const channel = await connection.createChannel()
// Declare queue (idempotent — creates if doesn't exist)
await channel.assertQueue('report-generation', {
durable: true // Queue survives broker restart
})
// Prefetch — don't give worker more than 3 jobs at once
channel.prefetch(3)
return { connection, channel }
}
// Publisher — API server puts jobs on the queue
async function publishJob(channel, jobData) {
channel.sendToQueue('report-generation', Buffer.from(JSON.stringify(jobData)), {
persistent: true, // Message survives broker restart
contentType: 'application/json',
timestamp: Date.now()
})
console.log('Published job:', jobData.id)
}
// Consumer — worker processes jobs from the queue
async function startWorker(channel) {
channel.consume('report-generation', async (msg) => {
if (!msg) return
const job = JSON.parse(msg.content.toString())
console.log('Processing job:', job.id)
try {
await generateReport(job)
// Acknowledge — remove from queue
channel.ack(msg)
console.log('Job completed:', job.id)
} catch (err) {
console.error('Job failed:', job.id, err.message)
// Reject — optionally requeue for retry
channel.nack(msg, false, !job.retryCount || job.retryCount < 3)
// false = don't requeue multiple messages
// true = requeue if retryCount < 3 . otherwise dead letter
}
})
console.log('Worker started, waiting for jobs...')
}
Bull — Redis-Backed Queues for Node.js¶
Bull is the go-to queue library for Node.js when you're already running Redis
npm install bull
const Queue = require('bull')
// Create a queue
const reportQueue = new Queue('report-generation', {
redis: {
host: 'localhost',
port: 6379,
password: process.env.REDIS_PASSWORD
},
defaultJobOptions: {
attempts: 3, // Retry up to 3 times
backoff: {
type: 'exponential', // Wait longer between retries
delay: 2000 // Start with 2 seconds
},
removeOnComplete: 100, // Keep last 100 completed jobs
removeOnFail: 50 // Keep last 50 failed jobs
}
})
// Producer — add jobs to the queue
app.post('/api/reports', async (req, res) => {
const job = await reportQueue.add({
userId: req.user.id,
reportType: req.body.type,
parameters: req.body.parameters,
createdAt: Date.now()
}, {
jobId: `${req.user.id}-${Date.now()}`, // Deduplication
priority: req.user.isPremium ? 1 : 10, // Premium users go first
delay: req.body.scheduleAt ? new Date(req.body.scheduleAt) - Date.now() : 0
})
res.status(202).json({
jobId: job.id,
status: 'queued'
})
})
// Consumer — process jobs
reportQueue.process(async (job) => {
const { userId, reportType, parameters } = job.data
// Progress reporting (0-100)
job.progress(10)
const data = await fetchReportData(userId, parameters)
job.progress(50)
const report = await generateReport(reportType, data)
job.progress(90)
await storeReport(userId, report)
job.progress(100)
return { reportId: report.id, url: report.url }
})
// Event handlers
reportQueue.on('completed', (job, result) => {
console.log(`Job ${job.id} completed: ${result.reportId}`)
// Notify user that report is ready
notifyUser(job.data.userId, 'report.ready', { reportId: result.reportId })
})
reportQueue.on('failed', (job, err) => {
console.error(`Job ${job.id} failed after ${job.attemptsMade} attempts:`, err.message)
})
reportQueue.on('stalled', (job) => {
console.warn(`Job ${job.id} stalled — worker may have crashed`)
})
Job Scheduling — Delayed and Recurring Jobs¶
// Delayed job — run after specific delay
await reportQueue.add(data, {
delay: 3600000 // Run in 1 hour
})
// Schedule for specific time
const scheduledTime = new Date('2026-06-06T10:00:00Z')
await reportQueue.add(data, {
delay: scheduledTime.getTime() - Date.now()
})
// Recurring jobs (Bull repeatable jobs)
const dailyCleanup = new Queue('cleanup', redisConfig)
// Run every day at midnight
await dailyCleanup.add({ task: 'cleanup-logs' }, {
repeat: {
cron: '0 0 * * *',
tz: 'America/New_York'
}
})
Queue Monitoring — Know What's Happening¶
Bull Arena (web UI for Bull queues):
const Arena = require('bull-arena')
Arena({
queues: [
{
name: 'report-generation',
hostId: 'Redis',
redis: { host: 'localhost', port: 6379 }
},
{
name: 'email-sending',
hostId: 'Redis',
redis: { host: 'localhost', port: 6379 }
}
]
})
Metrics to watch: * Queue depth — how many jobs are waiting? Growing = workers can't keep up * Active jobs — how many are processing right now? * Failed jobs — rate of failures , types of failures * Job duration — how long does each job take? Any outliers? * Stalled jobs — workers crashing? Jobs picked up but never completed?
Backpressure — When Workers Can't Keep Up¶
Backpressure happens when producers push jobs faster than consumers can process them
Symptoms: * Queue depth grows continuously * Job latency increases (jobs wait longer before processing) * Memory usage on Redis/broker increases * Eventually the broker runs out of memory or hits storage limits
Solutions: * Add more workers (horizontal scaling) * Rate limit producers * Drop lower-priority jobs * Use a dead letter queue for jobs that can't be processed
// Limit queue size to prevent unbounded growth
const queue = new Queue('reports', {
limiter: {
max: 100, // Max jobs per duration
duration: 1000 // Per second
}
})
// Or use Redis monitoring to alert
setInterval(async () => {
const count = await queue.getJobCounts()
if (count.waiting > 1000) {
console.error('Queue backlog critical:', count.waiting)
alertOperations('Queue backlog critical')
}
}, 30000)
Security — Don't Let Anyone Publish to Your Queue¶
Queue authentication:
# RabbitMQ with TLS + credentials
services:
rabbitmq:
image: rabbitmq:4-management
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD}
ports:
- "5671:5671" # TLS port
volumes:
- ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
# RabbitMQ config for TLS
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true
ssl_options.cacertfile = /etc/rabbitmq/ca.pem
ssl_options.certfile = /etc/rabbitmq/server.pem
ssl_options.keyfile = /etc/rabbitmq/key.pem
Queue ACLs (who can read/write):
rabbitmqctl set_permissions -p / producer "^reports-queue$" "" "^amq\.default"
rabbitmqctl set_permissions -p / consumer "" "^reports-queue$" ""
Never expose queue ports to the internet: * RabbitMQ management port 15672 should be internal only * Redis port 6379 should never be publicly accessible * Queue connections should use TLS in production
next → upcoming_06_whats_next.md