- Published on
Tích Hợp Model AI Với Thời Gian Xử Lý Lâu Vào API Sử Dụng FastAPI và Celery
- Authors
- Name
- Hoàng Hữu Mạnh
Tích Hợp Model AI Với Thời Gian Xử Lý Lâu Vào API Sử Dụng FastAPI và Celery
Trong thế giới phát triển ứng dụng hiện đại, việc tích hợp các mô hình AI phức tạp vào API để phục vụ nhu cầu người dùng là một thách thức lớn. Đặc biệt, khi mô hình AI của bạn có thời gian xử lý kéo dài đến một phút hoặc hơn, bạn cần một giải pháp hiệu quả để xử lý các yêu cầu một cách không đồng bộ, tránh làm nghẽn hệ thống. Trong bài viết này, chúng ta sẽ cùng tìm hiểu cách sử dụng FastAPI và Celery để tích hợp một model AI có thời gian xử lý lâu vào API, cùng với chức năng hủy (cancel) tác vụ.
Yêu Cầu
Trước khi bắt đầu, bạn cần chuẩn bị các công cụ và thư viện sau:
- Python: Ngôn ngữ lập trình chính.
- FastAPI: Framework để xây dựng API.
- Celery: Thư viện để quản lý và xử lý các tác vụ không đồng bộ.
- Redis: Hệ thống lưu trữ dữ liệu dạng key-value, được sử dụng như một message broker cho Celery.
Cài Đặt
Trước tiên, hãy cài đặt các thư viện cần thiết bằng pip:
pip install fastapi uvicorn celery[redis]
Cấu Hình Celery
Tiếp theo, chúng ta sẽ cấu hình Celery. Tạo một file tên là celery_app.py
để cấu hình Celery:
from celery import Celery, states
from celery.exceptions import Ignore
celery_app = Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0'
)
@celery_app.task(bind=True)
def long_running_task(self, data):
try:
# Giả lập thời gian xử lý 1 phút
import time
for i in range(60):
if self.request.called_directly:
break
time.sleep(1)
result = {"status": "completed", "data": data}
return result
except Exception as e:
self.update_state(state=states.FAILURE, meta={'exc': str(e)})
raise Ignore()
Tạo API Với FastAPI
Tiếp theo, chúng ta sẽ tạo các endpoint API sử dụng FastAPI. Tạo một file tên là main.py
:
from fastapi import FastAPI, BackgroundTasks, HTTPException
from celery.result import AsyncResult
from celery_app import celery_app, long_running_task
app = FastAPI()
@app.post("/process")
async def process_data(data: dict, background_tasks: BackgroundTasks):
task = long_running_task.delay(data)
return {"task_id": task.id}
@app.get("/status/{task_id}")
async def get_status(task_id: str):
task_result = AsyncResult(task_id, app=celery_app)
if task_result.state == "PENDING":
response = {"status": "pending"}
elif task_result.state != "FAILURE":
response = {"status": task_result.state, "result": task_result.result}
else:
response = {"status": "failure", "result": str(task_result.info)}
return response
@app.post("/cancel/{task_id}")
async def cancel_task(task_id: str):
task_result = AsyncResult(task_id, app=celery_app)
if task_result.state == "PENDING":
task_result.revoke(terminate=True)
response = {"status": "cancelled"}
elif task_result.state == "STARTED":
task_result.revoke(terminate=True)
response = {"status": "cancelled"}
else:
response = {"status": "cannot cancel", "current_state": task_result.state}
return response
Chạy Celery Worker
Trong một terminal khác, chạy Celery worker bằng lệnh sau:
celery -A celery_app worker --loglevel=info
Chạy FastAPI
Cuối cùng, chạy ứng dụng FastAPI bằng lệnh:
uvicorn main:app --reload
Kết Luận
Với hướng dẫn trên, bạn đã có thể tích hợp thành công một model AI với thời gian xử lý lâu vào API sử dụng FastAPI và Celery. Giải pháp này giúp bạn xử lý các yêu cầu đồng thời một cách hiệu quả, tránh làm nghẽn hệ thống, và cung cấp thêm chức năng hủy các tác vụ đang chờ hoặc đang chạy. Chúc bạn thành công trong việc triển khai và tối ưu hóa hệ thống của mình!