Published on

Pub/Sub - mô hình giao tiếp hiệu quả cho các hệ thống phân tán

Authors
  • avatar
    Name
    Hoàng Hữu Mạnh
    Twitter

Pub/Sub - mô hình giao tiếp hiệu quả cho các hệ thống phân tán

Trong các hệ thống phân tán ngày nay, việc các module, service, hệ thống giao tiếp và trao đổi dữ liệu với nhau là vô cùng quan trọng. Mô hình pub/sub (publisher/subscriber) chính là một giải pháp giao tiếp hiệu quả, linh hoạt cho các hệ thống phân tán, microservices.

Pub/Sub là gì?

Pub/Sub bao gồm 2 thành phần chính:

  • Publisher: module hoặc service tạo ra các sự kiện, tin nhắn và "publish" (đẩy, phát đi) chúng.
  • Subscriber: module hoặc service quan tâm và đăng ký nhận các sự kiện, tin nhắn từ publisher.

Khi có tin nhắn/sự kiện được publisher đẩy ra thì tất cả các subscriber đã đăng ký quan tâm sẽ nhận được và xử lý.

Pub/Sub mang đến nhiều lợi ích như:

  • Cho phép loose coupling giữa các module, service
  • Dễ dàng mở rộng bằng cách thêm subscriber mới
  • Tăng tính available vì không phụ thuộc vào một subscriber cụ thể nào

Ví dụ pub/sub với FastAPI và WebSocket

Để đơn giản hoá, chúng ta sẽ lấy ví dụ về một hệ thống các máy in (printer). Ta sẽ có các publisher sau:

  • PrinterPublisher: service gửi các sự kiện khi có việc in ấn nào đó xảy ra như báo toner hết, giấy hết, in xong,...

Các subscriber có thể là:

  • InMachineMonitor: service giám sát hoạt động của các máy in
  • MobileApp: ứng dụng mobile để xem trạng thái máy in
  • EmailNotifier: gửi email thông báo cho người quản trị máy in

Ta sẽ dùng FastAPI làm publisher, và WebSocket để subscriber đăng ký nhận sự kiện.

import asyncio

# Khai báo class Publisher
class Publisher:
    def __init__(self):
        self.subscribers = set()

    def register(self, who):
        self.subscribers.add(who)

    def unregister(self, who):
        self.subscribers.discard(who)

    async def publish(self, msg):
        for subscriber in self.subscribers:
            await subscriber.notify(msg)

# Khai báo class Subscriber    
class Subscriber:
    async def notify(self, publisher, msg):
        print(f"Nhận được {msg} từ {publisher}")

from fastapi import FastAPI
import asyncio

app = FastAPI()
printer_publisher = Publisher()

@app.websocket("/subscribe") 
async def subscribe(websocket):
    await websocket.accept()
    await printer_publisher.register(websocket)

@app.on_event("startup")
async def startup(): 
    await printer_publisher.publish("Printer service starting up...")
    
@app.on_event("shutdown")
async def shutdown():
    await printer_publisher.publish("Printer service shutting down...")

@app.post("/print")    
async def print(job):
   await printer_publisher.publish(f"Printing job {job}") 
   # logic to print something
   await printer_publisher.publish(f"Printed job {job} successfully")

Như vậy, bất kỳ dịch vụ nào cũng có thể subscribe qua WebSocket để nhận các update từ printer publisher mà không cần quan tâm đến chi tiết implement bên trong. Ví dụ InMachineMonitor có thể nhận được các notify để cập nhật trạng thái máy in trên dashboard của mình.

Hy vọng với ví dụ trên, bạn thấy được sự lợi ích và hiệu quả của pub/sub trong việc xây dựng các hệ thống phân tán. Chúc bạn thành công!