Published on

Xử lý đồng thời hiệu quả trong Python với Worker Pool

Authors
  • avatar
    Name
    Hoàng Hữu Mạnh
    Twitter

Xử lý đồng thời hiệu quả trong Python với Worker Pool

Xử lý đồng thời (concurrent processing) là khả năng xử lý nhiều task cùng lúc, giúp tăng hiệu suất và tốc độ xử lý so với xử lý tuần tự. Trong Python, do Global Interpreter Lock (GIL) mà chỉ có thể chạy một luồng (thread) trong một thời điểm. Vì vậy, chúng ta cần tới worker pool.

Worker pool là gì

Worker pool là một nhóm các worker (process/thread) chạy song song để giải quyết các task. Thay vì chỉ có single thread thì giờ đây chúng ta có một "pool" gồm nhiều worker.

Ưu điểm:

  • Xử lý đồng thời nhiều task, tăng hiệu suất
  • Tận dụng được nhiều CPU cores khác nhau

Python hỗ trợ sẵn một số cơ chế worker pool sau:

1. Process Pool

Được định nghĩa trong thư viện multiprocessing. Các worker ở đây thực chất là các process riêng biệt.

from multiprocessing import Pool
import os

def cube(n):
    print(f"Worker {os.getpid()} computing {n}")
    return n**3

if __name__ == "__main__":
    numbers = range(10)
    pool = Pool(processes=4)
    
    result = pool.map(cube, numbers)    
    print(result)

Ưu điểm là được chạy hoàn toàn độc lập, không bị GIL giới hạn.

Nhược điểm là tốn chi phí tạo process mới.

2. Thread Pool

Được định nghĩa trong thư viện concurrent.futures. Các worker ở đây vẫn là thread trong cùng một process.

from concurrent.futures import ThreadPoolExecutor
import threading

def cube(n):
    print(f"Worker {threading.current_thread().name} computing {n}")
    return n**3

with ThreadPoolExecutor(max_workers=5) as executor:
    results = executor.map(cube, numbers)
print(list(results))

Ưu điểm là khởi tạo thread nhanh hơn process.

Nhược điểm là vẫn bị hạn chế bởi GIL.

3. Asyncio

Asyncio là thư viện hỗ trợ xử lý bất đồng bộ (asynchronous) trong Python.

Chúng ta có thể coi mỗi asyncio task như một "worker" và sử dụng asyncio.gather để chạy chúng đồng thời.

import asyncio
import concurrent.futures

async def cube(n):
    print(f"Computing {n}")
    await asyncio.sleep(1)  
    return n**3
   
async def main():
    numbers = range(10)
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
    
    futs = [asyncio.create_task(cube(n)) for n in numbers]   
    results = await asyncio.gather(*futs)
    print(results)

asyncio.run(main())

Ưu điểm là có thể "đợi đồng thời" các task một cách non-blocking.

Nhược điểm là vẫn bị giới hạn bởi số lượng thread trong pool.

Như vậy, tùy từng trường hợp mà lựa chọn cách thức xử lý đồng thời phù hợp với worker pool trong Python nhé.