Published on

Tích Hợp Model AI Với Thời Gian Xử Lý Lâu Vào API Sử Dụng FastAPI và Celery

Authors
  • avatar
    Name
    Hoàng Hữu Mạnh
    Twitter

Tích Hợp Model AI Với Thời Gian Xử Lý Lâu Vào API Sử Dụng FastAPI và Celery

Trong thế giới phát triển ứng dụng hiện đại, việc tích hợp các mô hình AI phức tạp vào API để phục vụ nhu cầu người dùng là một thách thức lớn. Đặc biệt, khi mô hình AI của bạn có thời gian xử lý kéo dài đến một phút hoặc hơn, bạn cần một giải pháp hiệu quả để xử lý các yêu cầu một cách không đồng bộ, tránh làm nghẽn hệ thống. Trong bài viết này, chúng ta sẽ cùng tìm hiểu cách sử dụng FastAPI và Celery để tích hợp một model AI có thời gian xử lý lâu vào API, cùng với chức năng hủy (cancel) tác vụ.

Yêu Cầu

Trước khi bắt đầu, bạn cần chuẩn bị các công cụ và thư viện sau:

  • Python: Ngôn ngữ lập trình chính.
  • FastAPI: Framework để xây dựng API.
  • Celery: Thư viện để quản lý và xử lý các tác vụ không đồng bộ.
  • Redis: Hệ thống lưu trữ dữ liệu dạng key-value, được sử dụng như một message broker cho Celery.

Cài Đặt

Trước tiên, hãy cài đặt các thư viện cần thiết bằng pip:

pip install fastapi uvicorn celery[redis]

Cấu Hình Celery

Tiếp theo, chúng ta sẽ cấu hình Celery. Tạo một file tên là celery_app.py để cấu hình Celery:

from celery import Celery, states
from celery.exceptions import Ignore

celery_app = Celery(
    'tasks',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)

@celery_app.task(bind=True)
def long_running_task(self, data):
    try:
        # Giả lập thời gian xử lý 1 phút
        import time
        for i in range(60):
            if self.request.called_directly:
                break
            time.sleep(1)
        result = {"status": "completed", "data": data}
        return result
    except Exception as e:
        self.update_state(state=states.FAILURE, meta={'exc': str(e)})
        raise Ignore()

Tạo API Với FastAPI

Tiếp theo, chúng ta sẽ tạo các endpoint API sử dụng FastAPI. Tạo một file tên là main.py:

from fastapi import FastAPI, BackgroundTasks, HTTPException
from celery.result import AsyncResult
from celery_app import celery_app, long_running_task

app = FastAPI()

@app.post("/process")
async def process_data(data: dict, background_tasks: BackgroundTasks):
    task = long_running_task.delay(data)
    return {"task_id": task.id}

@app.get("/status/{task_id}")
async def get_status(task_id: str):
    task_result = AsyncResult(task_id, app=celery_app)
    if task_result.state == "PENDING":
        response = {"status": "pending"}
    elif task_result.state != "FAILURE":
        response = {"status": task_result.state, "result": task_result.result}
    else:
        response = {"status": "failure", "result": str(task_result.info)}
    return response

@app.post("/cancel/{task_id}")
async def cancel_task(task_id: str):
    task_result = AsyncResult(task_id, app=celery_app)
    if task_result.state == "PENDING":
        task_result.revoke(terminate=True)
        response = {"status": "cancelled"}
    elif task_result.state == "STARTED":
        task_result.revoke(terminate=True)
        response = {"status": "cancelled"}
    else:
        response = {"status": "cannot cancel", "current_state": task_result.state}
    return response

Chạy Celery Worker

Trong một terminal khác, chạy Celery worker bằng lệnh sau:

celery -A celery_app worker --loglevel=info

Chạy FastAPI

Cuối cùng, chạy ứng dụng FastAPI bằng lệnh:

uvicorn main:app --reload

Kết Luận

Với hướng dẫn trên, bạn đã có thể tích hợp thành công một model AI với thời gian xử lý lâu vào API sử dụng FastAPI và Celery. Giải pháp này giúp bạn xử lý các yêu cầu đồng thời một cách hiệu quả, tránh làm nghẽn hệ thống, và cung cấp thêm chức năng hủy các tác vụ đang chờ hoặc đang chạy. Chúc bạn thành công trong việc triển khai và tối ưu hóa hệ thống của mình!