RabbitMQ Work Queues với Node.js: Xử lý tác vụ nền không làm nghẽn API
Bạn đã bao giờ gặp tình huống API /api/order bỗng dưng chậm rề rề vì phải gửi email xác nhận, resize ảnh đại diện hay sinh báo cáo Excel chưa? Những tác vụ “nặng” này nếu xử lý đồng bộ sẽ chiếm dụng luồng xử lý, khiến người dùng phải chờ lâu, thậm chí request timeout.
RabbitMQ Work Queues là giải pháp kinh điển: tách việc nhận request (API) và xử lý nặng (background worker) qua một hàng đợi trung gian. Trong bài viết này, tôi sẽ hướng dẫn bạn triển khai Work Queues trong Node.js từ A đến Z, bao gồm các kỹ thuật nâng cao như Durable, Message Acknowledgment và Fair Dispatch – những thứ thường bị bỏ qua trong các bài viết tiếng Việt.
📌 Trước khi bắt đầu, nếu bạn chưa biết khi nào nên chọn RabbitMQ thay vì Kafka, hãy ghé thăm bài Kafka vs RabbitMQ: So Sánh Chi Tiết & Cách Chọn Message Broker (2026) để có cái nhìn tổng quan nhé.
Work Queues là gì? Tại sao API của bạn cần nó?

Message broker (trung gian tin nhắn) là một hệ thống giúp các dịch vụ giao tiếp với nhau một cách bất đồng bộ. RabbitMQ là một message broker phổ biến, hoạt động theo giao thức AMQP. Work Queues (còn gọi là Task Queues) là mô hình trong đó một bên gửi tin nhắn (producer) đẩy tác vụ vào hàng đợi, và nhiều bên nhận (consumers / workers) sẽ lấy tác vụ ra để xử lý. Mỗi tác vụ chỉ được một worker xử lý duy nhất.
Vấn đề điển hình không dùng hàng đợi
Giả sử API đặt hàng của bạn:
// API gửi email trực tiếp
app.post('/order', async (req, res) => {
await saveOrder(req.body);
await sendEmailConfirmation(req.body.email); // mất 2-3 giây
await resizeAvatar(req.body.userId); // mất 1-2 giây
res.json({ success: true });
});
- Thời gian phản hồi = tổng thời gian xử lý đồng bộ.
- Không mở rộng được: nếu lượng đơn hàng tăng đột biến, API sẽ quá tải.
- Mất dữ liệu khi crash: email chưa gửi xong mà Node.js crash thì coi như mất luôn.
Giải pháp với RabbitMQ
[API (Producer)] -> [Queue] -> [Worker 1] -> gửi email
-> [Worker 2] -> resize ảnh
-> ...
- API chỉ cần đẩy task vào queue (rất nhanh) và trả về ngay.
- Worker chạy riêng, xử lý lần lượt.
- RabbitMQ lưu task an toàn, sẵn sàng giao lại nếu worker gặp sự cố.
Cài đặt và kết nối RabbitMQ với amqplib
Trước hết, bạn cần RabbitMQ server. Cách nhanh nhất là dùng Docker:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
- Cổng
5672: giao thức AMQP (cho app kết nối). - Cổng
15672: Management UI (truy cậphttp://localhost:15672, user/pass:guest/guest).
Tạo project Node.js
mkdir rabbitmq-work-queue
cd rabbitmq-work-queue
npm init -y
npm install amqplib
⚠️ Verification risk: mã nguồn trong bài được viết với amqplib phiên bản ^0.10.4. API cơ bản không thay đổi nhiều qua các phiên bản, nhưng bạn nên kiểm lại npm amqplib nếu dùng phiên bản mới hơn.
Producer (Task Sender) – hoàn chỉnh
Tạo file producer.js – nhiệm vụ: đọc task từ command line, gửi vào queue với cơ chế durable và persistent.
// producer.js
const amqp = require('amqplib');
const QUEUE_NAME = 'task_queue';
async function sendTask(task) {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
// khai báo queue DURABLE - tồn tại sau khi RabbitMQ restart
await channel.assertQueue(QUEUE_NAME, { durable: true });
// chuyển task thành Buffer và gửi với option persistent
const message = JSON.stringify({ task, timestamp: Date.now() });
const sent = channel.sendToQueue(QUEUE_NAME, Buffer.from(message), { persistent: true });
console.log(`[x] Sent: ${task}`);
if (!sent) {
// Channel buffer đầy, đợi drain event (hiếm khi xảy ra với queue thường)
await new Promise(resolve => channel.once('drain', resolve));
}
// Đóng kết nối sau khi đảm bảo message đã được gửi
setTimeout(() => {
connection.close();
process.exit(0);
}, 1000); // đợi 1s thay vì 500ms để an toàn hơn
}
// Lấy task từ dòng lệnh: node producer.js "send email to user@example.com"
const task = process.argv.slice(2).join(' ') || 'default task';
sendTask(task).catch(console.error);
Giải thích code:
assertQueue({ durable: true }): queue không bị xóa khi RabbitMQ restart.{ persistent: true }: yêu cầu RabbitMQ ghi message xuống disk (tối đa có thể). Kết hợp với queue durable giúp task không bị mất khi server gặp sự cố.- Kiểm tra
!sentvà lắng nghe'drain'để đảm bảo message không bị drop do buffer đầy. - Đợi 1 giây trước khi đóng kết nối – đủ để RabbitMQ xác nhận nhận message.
Consumer (Worker) – hoàn chỉnh, an toàn với try/catch
Tạo file worker.js – nhận task, xử lý và gửi acknowledgment sau khi hoàn thành, đảm bảo luôn gửi ack hoặc xử lý lỗi đúng cách.
// worker.js
const amqp = require('amqplib');
const QUEUE_NAME = 'task_queue';
async function startWorker() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
// Khai báo lại queue (nếu chưa có) với durable
await channel.assertQueue(QUEUE_NAME, { durable: true });
// Fair Dispatch: chỉ gửi tối đa 1 task cho worker tại một thời điểm
channel.prefetch(1);
console.log(' [*] Waiting for tasks. To exit press CTRL+C');
// Xử lý mất kết nối
connection.on('error', (err) => {
console.error('Connection error:', err);
setTimeout(startWorker, 5000); // thử reconnect sau 5s
});
// Nhận task từ queue
channel.consume(QUEUE_NAME, async (msg) => {
if (msg !== null) {
const content = JSON.parse(msg.content.toString());
console.log(` [x] Received: ${content.task}`);
try {
// Giả lập xử lý task nặng (resize ảnh, gửi email, ...)
await simulateWork(content.task);
console.log(` [x] Done: ${content.task}`);
// ✅ Gửi ack – báo RabbitMQ đã xử lý xong, có thể xóa message
channel.ack(msg);
} catch (err) {
console.error(` [x] Failed: ${content.task}`, err);
// Xử lý lỗi: vẫn ack để xóa message khỏi queue (tránh requeue mãi)
// Nếu muốn retry, nên gửi vào Dead Letter Exchange hoặc nack với requeue = false
channel.ack(msg);
// Hoặc dùng channel.nack(msg, false, false) để xóa và không requeue
}
}
});
}
// Hàm giả lập công việc nặng (thực tế có thể là gọi API, xử lý file, ...)
function simulateWork(task) {
return new Promise((resolve, reject) => {
// Giả sử mỗi dấu chấm trong task tương ứng 1 giây
const duration = (task.match(/\./g) || []).length * 1000 || 1000;
if (duration > 10000) {
reject(new Error('Task quá dài, từ chối xử lý')); // ví dụ lỗi
}
setTimeout(resolve, duration);
});
}
startWorker().catch(console.error);
Giải thích code:
channel.prefetch(1): quan trọng – chỉ lấy 1 task mỗi lần, đảm bảo phân phối công bằng.connection.on('error', ...): tái kết nối nếu mất RabbitMQ.try/catchbao quanh xử lý: nếu có lỗi, vẫn gọiackđể xóa message khỏi queue (tránh requeue vô hạn). Trong thực tế, bạn có thể gửi message lỗi vào Dead Letter Queue.- Nếu muốn requeue khi lỗi tạm thời, dùng
channel.nack(msg, false, true)nhưng cẩn thận với vòng lặp vô hạn.
Message Acknowledgment: Đừng để mất Task

Trong phiên bản đơn giản nhất (không ack), RabbitMQ sẽ xóa message ngay sau khi gửi đến worker. Nếu worker chết giữa chừng, task sẽ biến mất vĩnh viễn.
Với Message Acknowledgment (cơ chế ack):
- Worker gửi ack khi xử lý xong.
- Nếu worker crash (hoặc không gửi ack), RabbitMQ sẽ requeue – gửi lại task cho worker khác.
⚠️ Lỗi thường gặp #1: Quên gửi Acknowledgment
// SAI - thiếu channel.ack(msg)
channel.consume(QUEUE_NAME, async (msg) => {
await doTask(msg);
// quên ack ==> RabbitMQ sẽ giữ message mãi, bộ nhớ tăng vô hạn
});
Hậu quả: RabbitMQ coi message vẫn “đang xử lý”, không bao giờ xóa, cuối cùng tràn bộ nhớ.
Cách sửa: Luôn gọi channel.ack(msg) trong finally:
channel.consume(QUEUE_NAME, async (msg) => {
try {
await doTask(msg);
} catch (err) {
// log lỗi
} finally {
channel.ack(msg); // đảm bảo ack
}
});
Fair Dispatch: Phân phối việc thông minh

Mặc định, RabbitMQ phân phối message theo vòng tròn (round-robin), bất kể worker có nhanh hay chậm. Ví dụ: Worker A xử lý nhanh (1 giây), Worker B xử lý chậm (10 giây) thì round-robin vẫn gửi cho cả hai luân phiên, dẫn đến task chờ đợi ở worker B ngày càng nhiều.
Fair Dispatch giải quyết bằng prefetch(1). Hãy xem lại dòng code này:
channel.prefetch(1);
Nó yêu cầu RabbitMQ chỉ gửi tối đa 1 message chưa được ack cho worker. Worker nào ack nhanh sẽ nhận task mới ngay, worker chậm không bị nhồi.
Thử nghiệm so sánh
- Round-robin (mặc định): 10 task, 1 worker nhanh (1s), 1 worker chậm (5s) → worker chậm tích tụ 5 task trong khi worker nhanh chỉ làm được 5 task (lãng phí).
- Fair Dispatch với prefetch(1): worker nhanh sẽ làm ~9 task, worker chậm chỉ 1 task, tổng thời gian hoàn thành nhanh hơn.
🔑 Best practice: Luôn đặt prefetch(1) cho Work Queues trừ khi bạn có lý do đặc biệt.
Durable & Persistent: Đảm bảo task không mất khi RabbitMQ restart
| Cấp độ | Cấu hình | Tác dụng |
|---|---|---|
| Queue durable | assertQueue(..., { durable: true }) |
Queue tồn tại sau khi RabbitMQ restart |
| Message persistent | sendToQueue(..., { persistent: true }) |
Message được ghi xuống disk, không mất khi restart |
Phối hợp cả hai → task an toàn tối đa (trừ trường hợp disk bị lỗi).
Lỗi thường gặp #2: Chỉ đánh dấu queue durable nhưng quên persistent
// Không đủ an toàn
channel.sendToQueue(queue, Buffer.from(msg)); // persistent mặc định false
Khi RabbitMQ restart, queue vẫn còn nhưng tất cả message chưa được ack sẽ biến mất.
Best Practices khi dùng Work Queues trong thực tế
- Luôn đánh dấu queue
durablevà messagepersistent– trừ khi bạn chấp nhận mất task khi deploy. - Sử dụng
prefetch(1)để phân phối thông minh. - Gửi ack sau khi xử lý thành công, dùng
try/finallyđể đảm bảo. - Xử lý lỗi và dùng Dead Letter Exchanges (DLX) cho các task thất bại sau nhiều lần retry.
- Ví dụ: tạo queue
task_queue_dlq, gánx-dead-letter-exchangeđể chuyển task lỗi sang hàng đợi riêng.
- Ví dụ: tạo queue
- Giữ task nhẹ và có timeout. Nếu task có thể chạy vô hạn, hãy thiết kế cơ chế heartbeat.
- Theo dõi độ dài hàng đợi qua RabbitMQ Management UI hoặc API.
- Tái kết nối khi mất kết nối với RabbitMQ (xem ví dụ trong
worker.js).
Common Errors & Cách khắc phục
| Lỗi | Nguyên nhân | Cách sửa |
|---|---|---|
| Queue bị xóa sau khi restart | Không khai báo durable: true |
assertQueue(name, { durable: true }) |
| Message bị mất khi restart server | persistent: false (mặc định) |
Thêm { persistent: true } |
| Worker bị nhồi quá nhiều task | Không dùng prefetch |
channel.prefetch(1) |
| Memory leak trong RabbitMQ | Worker quên ack | Luôn gọi ack trong finally |
| Kết nối bị đóng đột ngột | Thiếu xử lý error event | Lắng nghe connection.on('error', ...) và reconnect |
FAQ
1. RabbitMQ có nhanh hơn Kafka không?
Không thể so sánh trực tiếp. RabbitMQ tối ưu cho xử lý hàng đợi task (each message xử lý một lần, ưu tiên độ tin cậy và tính năng routing). Kafka tối ưu cho luồng dữ liệu lớn, replay log, throughput cao nhưng không hỗ trợ ack theo từng message giống RabbitMQ. Đọc thêm bài so sánh chi tiết: Kafka vs RabbitMQ.
2. Làm sao để monitor các hàng đợi đang chạy?
- RabbitMQ Management Plugin (cổng 15672): xem số lượng ready, unacked messages, tốc độ.
- CLI:
rabbitmqctl list_queues name messages_ready messages_unacknowledged - API HTTP:
GET /api/queuestrả về JSON. - Prometheus + Grafana dành cho hệ thống lớn.
3. Tôi có thể dùng Redis thay cho RabbitMQ không?
Có, với Redis List hoặc Pub/Sub, nhưng RabbitMQ có ack, dead letter, routing, persistent mạnh mẽ hơn. Redis phù hợp cho cache hoặc hàng đợi đơn giản, không yêu cầu độ tin cậy tuyệt đối.
4. Làm thế nào để scale số lượng worker?
Chạy thêm nhiều instance của file worker.js. RabbitMQ tự động phân phối task cho tất cả consumer đang kết nối cùng queue. Với prefetch(1), worker mới tham gia sẽ nhận task ngay.
Tài liệu tham khảo chính thức
Kết luận
Bạn đã thấy cách dùng RabbitMQ Work Queues trong Node.js để xử lý các tác vụ nặng (gửi email, resize ảnh, sinh báo cáo) mà không làm nghẽn API. Quan trọng nhất là:
- Queue durable + message persistent để không mất task.
- Acknowledgment để đảm bảo task được xử lý thành công.
- Prefetch(1) để phân phối thông minh.
- Try/catch + ack trong finally để tránh lỗi và memory leak.
Hy vọng bạn áp dụng thành công vào hệ thống của mình. Nếu muốn tìm hiểu sâu hơn về cách RabbitMQ so sánh với Kafka trong các hệ thống event-driven, đừng quên đọc bài Kafka vs RabbitMQ: So Sánh Chi Tiết & Cách Chọn Message Broker (2026) nhé!
Chúc bạn code vui vẻ và đừng quên prefetch(1)