Redis Pub/Sub và Streams trong Node.js: Hướng dẫn Real-time & Event-driven

VMas-Dev-KA

Redis Pub/Sub & Streams: Giải pháp cho hệ thống Real-time và Event-driven

Bạn đang xây dựng một hệ thống chat, thông báo đẩy hay xử lý sự kiện theo thời gian thực? Redis thường là lựa chọn đầu tiên nhờ tốc độ trong bộ nhớ. Tuy nhiên, nhiều developer vẫn nhầm lẫn giữa Pub/SubStreams – dẫn đến mất tin nhắn hoặc thiết kế sai kiến trúc.

Trong bài viết này, chúng ta sẽ:

  • Hiểu cơ chế hoạt động của Pub/Sub và Streams.
  • So sánh chi tiết để chọn đúng công cụ cho bài toán event-driven.
  • Thực hành xây dựng chat server đơn giản với Pub/Sub trong Node.js.
  • Xử lý tin nhắn theo nhóm an toàn với Streams.

📌 Lưu ý quan trọng từ đầu: Pub/Sub trong Redis không lưu lại tin nhắn nếu subscriber bị ngắt kết nối. Hãy cân nhắc kỹ trước khi dùng cho các tác vụ quan trọng cần độ tin cậy cao.


1. Pub/Sub là gì? Mô hình “Phát loa” thông tin

Redis Pub/Sub: publisher gửi tin, subscriber online nhận, offline mất tin

Redis Pub/Sub (Publish/Subscribe) là mô hình messaging mà ở đó:

  • Publisher gửi tin nhắn đến một channel (kênh).
  • Subscriber lắng nghe trên channel đó và nhận ngay lập tức.
  • Không có lưu trữ – nếu subscriber chưa kết nối hoặc bị ngắt, tin nhắn sẽ biến mất vĩnh viễn.

🔍 Cơ chế hoạt động

Khi một publisher thực hiện lệnh PUBLISH channel "Hello", Redis sẽ:

  1. Tìm tất cả subscriber đang kết nối vào channel.
  2. Gửi trực tiếp message đến từng client.
  3. Nếu không có subscriber nào, message bị hủy ngay lập tức.

Điều này mang lại hiệu năng cực cao (microsecond), nhưng cũng là con dao hai lưỡi: At-most-once delivery (mỗi tin gửi tối đa một lần, có thể không lần nào).

✅ Khi nào nên dùng Pub/Sub?

  • Cập nhật trạng thái real-time (ví dụ: bảng xếp hạng live, giá coin).
  • Gửi thông báo tức thời mà không cần lưu lại khi user offline.
  • Scale Socket.IO lên nhiều server (xem thêm bài viết về Socket.io Chat Real-time).

❌ Khi nào tuyệt đối không dùng Pub/Sub?

  • Hệ thống thông báo quan trọng (ví dụ: xác nhận thanh toán, reset mật khẩu).
  • Lưu lại lịch sử chat hay audit log.
  • Xử lý sự kiện mà các service có thể down.

📄 Xác nhận từ tài liệu chính thức: Redis Pub/Sub documentation khẳng định cơ chế “fire and forget”, không có bất kỳ cơ chế xác nhận hay lưu trữ nào. (Nguồn: Redis Pub/Sub)


2. Redis Streams: Khi bạn cần lịch sử sự kiện

Consumer group trong Redis Streams giúp nhiều worker xử lý song song và xác nhận thành công

Redis Streams (xuất hiện từ Redis 5.0, phổ biến ở version 7.x) được thiết kế để giải quyết điểm yếu của Pub/Sub: lưu trữ và phát lại sự kiện. Nó hoạt động giống một log chỉ ghi thêm (append-only log).

🧩 Consumer Groups – “Chia việc” cho nhiều consumer

Điểm mạnh nhất của Streams so với Pub/Sub là Consumer Group:

  • Nhiều consumer (instance xử lý) cùng đọc dữ liệu từ một stream.
  • Mỗi message chỉ được giao cho một consumer trong nhóm (tránh xử lý trùng).
  • Hỗ trợ acknowledgment (ACK): Sau khi xử lý xong, consumer gửi ACK, nếu không, message sẽ được gửi lại.

Cách này gần giống Kafka nhưng nhẹ hơn, phù hợp với hệ thống vừa và nhỏ.

✨ Tính năng nổi bật

  • Mỗi message có ID tự động tăng theo thời gian.
  • Duy trì lịch sử bất kể có consumer đang online hay không.
  • Có thể đọc từ đầu, từ một ID cụ thể, hoặc chỉ đọc tin mới.
  • Hỗ trợ block (chờ tin mới) hiệu quả.

✅ Khi nào dùng Redis Streams?

  • Hệ thống xử lý đơn hàng, thông báo cần đảm bảo không mất.
  • Chat cần lưu lại lịch sử và cho phép đọc lại.
  • Pipeline ETL nhẹ, queue công việc (job queue).
  • Cần phát lại sự kiện khi có service mới tham gia.

3. So sánh: Khi nào chọn Pub/Sub, khi nào chọn Streams?

Tiêu chí Pub/Sub Streams
Lưu trữ tin nhắn ❌ Không lưu ✅ Lưu đến khi bị xóa thủ công / giới hạn maxlen
Phát lại tin cũ ❌ Không thể ✅ Có thể đọc từ bất kỳ ID nào
Người dùng offline có nhận được không? ❌ Mất ngay ✅ Có – khi online sẽ đọc tiếp
Consumer Groups (chia việc) ❌ Không, tất cả subscriber cùng nhận ✅ Có, mỗi message chỉ một consumer
Acknowledgment ❌ Không ✅ Có – đảm bảo xử lý thành công
Hiệu năng Cực nhanh (microsecond) Nhanh (nhưng có chi phí lưu log)
Trường hợp điển hình Real-time broadcast, cập nhật số liệu Xử lý sự kiện đáng tin cậy, job queue

🧠 Tóm gọn nguyên tắc:

  • Cần “lưu vết” hay đảm bảo giao hàng?Streams.
  • Chỉ cần “phát loa” thông tin nhạy cảm thời gian thực, không quan trọng?Pub/Sub.

4. Thực hành: Chat Server đơn giản với Pub/Sub (Node.js)

Mặc dù Streams mạnh hơn, Pub/Sub vẫn rất phổ biến cho các tính năng như broadcast tin nhắn chat. Hãy xây dựng ví dụ tối giản.

Cài đặt thư viện: npm install ioredis

4.1 Publisher / Subscriber cơ bản

// subscriber.js
const Redis = require('ioredis');
const subscriber = new Redis(); // kết nối đến localhost:6379

subscriber.subscribe('chat-room-1', (err, count) => {
  if (err) console.error(err);
  else console.log(`Đã subscribe vào ${count} kênh`);
});

subscriber.on('message', (channel, message) => {
  console.log(`Nhận từ ${channel}: ${message}`);
});
// publisher.js
const Redis = require('ioredis');
const publisher = new Redis();

async function sendMessage() {
  await publisher.publish('chat-room-1', 'Xin chào cả lớp!');
  console.log('Đã gửi tin nhắn');
  process.exit(0);
}

sendMessage();

Giải thích:

  • subscriber.subscribe() đăng ký lắng nghe kênh chat-room-1.
  • publisher.publish() gửi tin nhắn đến kênh đó.
  • Mọi subscriber đang online sẽ nhận được ngay.

⚠️ Lỗi thường gặp: Nếu subscriber.js chạy sau khi publisher.js gửi tin, bạn sẽ không nhận được gì. Đây chính là hạn chế lớn nhất của Pub/Sub.

4.2 Xử lý lỗi mất kết nối

Trong thực tế, bạn nên thêm logic reconnect và kiểm tra trạng thái:

const subscriber = new Redis({
  retryStrategy: (times) => Math.min(times * 50, 2000)
});

subscriber.on('error', (err) => console.error('Redis error', err));
subscriber.on('reconnecting', () => console.log('Đang kết nối lại...'));

5. Thực hành nâng cao: XReadGroup với Redis Streams (Consumer Groups)

Ví dụ này mô phỏng hệ thống gửi email hàng loạt – nhiều worker cùng xử lý, không ai làm trùng.

5.1 Tạo Stream và Consumer Group

const Redis = require('ioredis');
const client = new Redis();

async function init() {
  // Xoá group cũ nếu có (chạy lần đầu)
  try {
    await client.xgroup('DESTROY', 'email-stream', 'email-workers');
  } catch(e) {}

  // Tạo stream và group – MKSTREAM tự tạo nếu chưa tồn tại
  await client.xgroup('CREATE', 'email-stream', 'email-workers', '$', { mkstream: true });
  console.log('Group đã sẵn sàng');
}
init();

5.2 Producer: Thêm tin nhắn vào Stream

async function addEmailTask(to, subject) {
  const id = await client.xadd('email-stream', '*', 'to', to, 'subject', subject);
  console.log(`Đã thêm task: ${id}`);
}

// Thử thêm vài task
addEmailTask('user1@example.com', 'Chào mừng');
addEmailTask('user2@example.com', 'Khuyến mãi');

5.3 Consumer với XReadGroup và ACK

Mỗi worker sẽ đọc từ group, xử lý, và gửi ACK.

async function startWorker(workerId) {
  console.log(`Worker ${workerId} bắt đầu`);

  while (true) {
    try {
      // Đọc 1 message từ group, block tối đa 2 giây nếu không có
      const result = await client.xreadgroup(
        'GROUP', 'email-workers', workerId,
        'BLOCK', 2000,
        'COUNT', 1,
        'STREAMS', 'email-stream', '>'
      );

      if (!result) {
        // Hết tin nhắn, tiếp tục vòng lặp
        continue;
      }

      const [, messages] = result[0];
      for (const [id, fields] of messages) {
        // fields là [key, value, key, value...] -> chuyển thành object
        const email = {};
        for (let i = 0; i < fields.length; i += 2) {
        email[fields[i]] = fields[i+1];
        }

        console.log(`Worker ${workerId} xử lý email đến ${email.to}`);
        // Giả lử xử lý gửi email
        await new Promise(r => setTimeout(r, 500));

        // Xác nhận đã xử lý xong
        await client.xack('email-stream', 'email-workers', id);
        console.log(`Worker ${workerId} đã ACK cho ${id}`);
      }
    } catch (err) {
      console.error('Lỗi worker:', err);
    }
  }
}

// Chạy 2 worker cùng lúc
startWorker('worker-1');
startWorker('worker-2');

Giải thích:

  • xreadgroup với > có nghĩa là “chỉ đọc những tin nhắn chưa được giao cho bất kỳ consumer nào trong group”.
  • Sau khi xử lý xong, gọi xack để Redis biết rằng message đã được xử lý thành công.
  • Nếu worker bị crash trước khi ack, Redis sẽ giao message đó cho worker khác sau thời gian timeout (mặc định 2 phút nhưng có thể cấu hình).

6. Lỗi thường gặp và cách khắc phục

Redis Pub/Sub mất tin nhắn nếu subscriber kết nối sau khi tin được gửi

Lỗi Nguyên nhân Hướng xử lý
Mất tin nhắn khi subscriber offline Pub/Sub không lưu trữ Chuyển sang Redis Streams hoặc RabbitMQ nếu cần độ tin cậy cao
Streams bị tràn bộ nhớ Quên cài MAXLEN Dùng XADD ... MAXLEN ~ 10000 để giới hạn độ dài
Consumer group không nhận được tin Group chưa tạo hoặc dùng $ (chỉ tin mới sau khi tạo) Tạo group với nếu muốn đọc lại từ đầu
Kết nối Redis bị timeout Không xử lý lỗi mạng Thiết lập retryStrategyenableAutoPipelining

7. Best Practices khi dùng Pub/Sub & Streams trong Node.js

Dùng namespace cho channel
Ví dụ: chat:room:101, notify:user:2024 – tránh xung đột.

Sử dụng bộ đệm (buffer) nếu lượng message quá lớn
Nếu publisher gửi 10k tin/giây, consumer không kịp xử lý, hãy thêm queue trong bộ nhớ hoặc dùng stream với COUNT hợp lý.

Giám sát bộ nhớ
Redis Streams lưu trên RAM. Dùng XTRIM hoặc MAXLEN để giới hạn.

Xử lý ACK đúng nơi
Chỉ gọi XACK sau khi xử lý xong hoàn toàn. Nếu gọi sớm mà xử lý thất bại, tin nhắn sẽ mất.

Không dùng Pub/Sub cho các tác vụ quan trọng
Hãy nhớ câu thần chú: Pub/Sub không hứa hẹn giao hàng tận nơi.


8. Câu hỏi thường gặp (FAQ)

❓ Redis Streams có thay thế được Kafka không?

Trả lời:
Không hoàn toàn. Redis Streams nhẹ hơn Kafka, phù hợp với hệ thống có throughput trung bình (ví dụ vài chục nghìn message/giây), nhưng không có ngưỡng cứng. Kafka mạnh hơn về lưu trữ lâu dài (disk), phân vùng (partition), và khả năng scale ngang. Tuy nhiên, với hầu hết ứng dụng vừa và nhỏ, Streams là đủ.

❓ Giới hạn số lượng channel trong Pub/Sub là bao nhiêu?

Trả lời:
Redis không giới hạn cứng số channel, nhưng mỗi channel sẽ tiêu tốn bộ nhớ cho pattern matching. Nếu có hàng chục nghìn channel, bạn nên thiết kế lại (ví dụ dùng pattern user:*) hoặc chuyển sang Streams.

❓ Làm sao để đảm bảo Pub/Sub không mất tin nhắn khi network bị gián đoạn?

Trả lời:
Không thể. Đó là giới hạn cố hữu. Thay vào đó, hãy dùng Redis Streams kết hợp với persistence (RDB/AOF) và cơ chế ack.


Kết luận

Redis cung cấp cả hai mô hình Pub/Sub (tức thời, không lưu) và Streams (có lưu trữ, consumer group). Việc lựa chọn đúng sẽ giúp bạn tránh mất dữ liệu và tối ưu chi phí vận hành.

  • Nếu chỉ cần broadcast real-time – dùng Pub/Sub.
  • Nếu cần lưu vết, xử lý đáng tin cậy, chia việc cho nhiều worker – dùng Streams.

Hy vọng qua bài viết này, bạn đã có thể tự tin thiết kế hệ thống event-driven với Redis và Node.js.

🎯 Thử thách nhỏ: Hãy kết hợp Redis Pub/Sub với Socket.io để build một ứng dụng chat hoàn chỉnh, có thể scale ngang qua nhiều server.

🔥 Đọc thêm bài: Hướng dẫn Socket.io: Xây dựng ứng dụng Chat Real-time từ A-Z!

Chia sẻ bài viết này