Published on

Làm thế nào để đồng bộ hóa truy cập các tài nguyên trong hệ thống phân tán? Sử dụng PostgreSQL Advisory Lock

Authors
  • avatar
    Name
    Hoàng Hữu Mạnh
    Twitter

Làm thế nào để đồng bộ hóa truy cập các tài nguyên trong hệ thống phân tán? Sử dụng PostgreSQL Advisory Lock

Trong các ứng dụng phân tán hiện đại, chúng ta thường xuyên phải đối mặt với vấn đề cần đồng bộ hóa truy cập vào các tài nguyên chung như cơ sở dữ liệu, file, queue,... giữa nhiều dịch vụ. Điều này nhằm đảm bảo tính nhất quán của dữ liệu, tránh các tình huống xung đột và race condition khi nhiều dịch vụ cùng truy cập một tài nguyên.

Một giải pháp đơn giản và hiệu quả là sử dụng cơ chế khóa phân tán (distributed locking). Ý tưởng là mỗi dịch vụ sẽ yêu cầu một khóa trước khi thực hiện các thao tác trên tài nguyên và giải phóng khóa sau khi xong.

Trong bài viết này, chúng ta sẽ tìm hiểu về tính năng PostgreSQL Advisory Lock - một cơ chế khóa phân tán được tích hợp sẵn trong PostgreSQL, giúp đơn giản hóa việc xây dựng các ứng dụng phân tán có sử dụng PostgreSQL.

PostgreSQL Advisory Lock là gì?

PostgreSQL Advisory Lock là một cơ chế khóa ở mức ứng dụng do PostgreSQL cung cấp, cho phép các ứng dụng định nghĩa và sử dụng các khóa có tên tuỳ ý để đồng bộ hóa truy cập giữa các phiên/kết nối.

Khóa tương hỗ được quản lý bởi một cơ chế bên trong PostgreSQL dựa trên bảng hash. Mỗi khóa có một ID (64 bit) được sử dụng như key của hash table.

Các khóa tương hỗ có những đặc điểm sau:

  • Chỉ tồn tại trong phiên PostgreSQL hiện tại. Nếu mất kết nối, khóa bị mất.
  • Các khóa được tạo và quản lý hoàn toàn trên phía client, server PostgreSQL không lưu trữ thông tin về khóa.
  • Các khóa không có quan hệ với bất kỳ data hoặc object cụ thể trong database. Chúng chỉ được dùng để đồng bộ hóa truy cập tài nguyên ở mức ứng dụng.

Sử dụng PostgreSQL Advisory Lock Để làm việc với khóa tương hỗ trong PostgreSQL, chúng ta sử dụng 2 hàm sau:

  • pg_advisory_lock(id bigint): Yêu cầu khóa với một ID nhất định. Nếu khóa đang được một phiên khác giữ, hàm sẽ chờ cho tới khi khóa được giải phóng.

  • pg_advisory_unlock(id bigint): Giải phóng khóa với ID tương ứng.

Ví dụ: đồng bộ hóa đọc ghi file Giả sử chúng ta cần đồng bộ hóa việc đọc ghi một file chung giữa nhiều tiến trình. Ta có thể sử dụng PostgreSQL advisory lock để triển khai điều này như sau:

import psycopg2
import time

# Hàm kết nối DB
def get_connection():
  return psycopg2.connect(host="..", dbname="..")

# Hàm yêu cầu khóa trước khi đọc ghi file
def lock_file(file_name):
  conn = get_connection()
  cur = conn.cursor()  
  # Dùng tên file làm ID của khóa 
  cur.execute("SELECT pg_advisory_lock(%s)", (hash(file_name),)) 
  return cur.fetchone()[0]

def unlock_file(file_name):
  conn = get_connection()
  cur = conn.cursor()
  cur.execute("SELECT pg_advisory_unlock(%s)", (hash(file_name),))

# Sử dụng khóa    
f = open("data.txt")

got_lock = lock_file(f.name)
if got_lock:
  # Đọc ghi file
  f.read()  
  f.write("Xin chào")
  
  unlock_file(f.name)  
else:
  # Không lấy được khóa
  print("File đang bị khóa") 

ví dụ trên để sử dụng PostgreSQL advisory lock đồng bộ hóa truy cập queue giữa các tiến trình producer và consumer.

Giả sử chúng ta có một queue được lưu trữ trong table queue trong PostgreSQL database. Table này có các trường như id, status, message.

Đoạn code Python dưới đây sử dụng khóa để đồng bộ hóa việc pop/push message vào queue giữa consumer và producer:

import psycopg2
import time

QUEUE_LOCK_ID = 10000 # ID khóa 

# Hàm consumer - lấy message từ queue
def get_message_from_queue():

  got_lock = try_advisory_lock(QUEUE_LOCK_ID)  
  if got_lock:
    # Có khóa rồi, đọc queue

    conn = get_db_conn() 
    cur = conn.cursor()

    # Pop message khỏi queue
    cur.execute("SELECT id, message FROM queue WHERE status = 'new' ORDER BY id LIMIT 1")
    msg = cur.fetchone()

    # Cập nhật status đã xử lý  
    cur.execute("UPDATE queue SET status = 'processed' WHERE id = %s", (msg[0],))

    unlock_advisory_lock(QUEUE_LOCK_ID)

    # Trả về message
    return msg
  
  else:
    # Đang bị khóa, quay lại sau
    time.sleep(0.1) 
    return get_message_from_queue()

# Hàm producer - đưa message vào trong queue 
def push_message_to_queue(msg):

  got_lock = try_advisory_lock(QUEUE_LOCK_ID)

  if got_lock:
    conn = get_db_conn()
    cur = conn.cursor() 

    # Thêm message mới    
    cur.execute("INSERT INTO queue (message, status) VALUES (%s, %s)", (msg, 'new'))

    unlock_advisory_lock(QUEUE_LOCK_ID)

Như vậy, với advisory lock của PostgreSQL, việc đồng bộ hóa truy cập vào queue được thực hiện một cách đơn giản và hiệu quả.

Ưu điểm của PostgreSQL Advisory Lock

  • Đơn giản, dễ sử dụng, không yêu cầu server hay thành phần bổ trợ.
  • Hiệu suất cao với hỗ trợ đợi khóa được tích hợp.
  • Dễ dàng tích hợp với các ứng dụng sử dụng PostgreSQL.

Kết luận PostgreSQL Advisory Lock là một cơ chế