Background Tasks এবং Distributed Task Queue

Web Development - ফাস্টএপিআই (FastAPI) - FastAPI এবং Celery এর Integration (Task Queue)
205

FastAPI তে Background Tasks এবং Distributed Task Queue ব্যবস্থাপনা একটি গুরুত্বপূর্ণ ফিচার যা অ্যাপ্লিকেশনের পারফরম্যান্স এবং স্কেলেবিলিটি বাড়ানোর জন্য ব্যবহৃত হয়। যখন আপনার অ্যাপ্লিকেশনকে ব্যাকগ্রাউন্ডে কাজ চালানোর জন্য সক্ষম করতে হয় বা long-running tasks পরিচালনা করতে হয়, তখন Background Tasks এবং Distributed Task Queue ব্যবহৃত হয়।

এখানে আমরা FastAPI এবং Celery এর ইন্টিগ্রেশন এবং ব্যাকগ্রাউন্ড টাস্ক ব্যবস্থাপনা কিভাবে করা যায় তা দেখব।


1. Background Tasks in FastAPI

FastAPI তে Background Tasks ব্যবহারের মাধ্যমে আপনি এক্সপেন্সিভ বা টাইম-কনজিউমিং কাজগুলো ব্যাকগ্রাউন্ডে চালাতে পারেন, যাতে এটি প্রধান HTTP রিকোয়েস্ট প্রসেসিং এর সময় বিলম্ব না ঘটায়।

উদাহরণ: Background Tasks

FastAPI তে BackgroundTask ব্যবহার করার জন্য BackgroundTasks ক্লাসটি ইনজেক্ট করতে হয়, এবং এতে এক বা একাধিক টাস্ক ব্যাকগ্রাউন্ডে চালানো হয়।

from fastapi import FastAPI, BackgroundTasks
import time

app = FastAPI()

# ব্যাকগ্রাউন্ড ফাংশন
def write_log(message: str):
    time.sleep(5)
    with open("log.txt", mode="a") as log:
        log.write(message + "\n")

# এন্ডপয়েন্ট যেখানে ব্যাকগ্রাউন্ড টাস্ক ব্যবহৃত হবে
@app.get("/send-email/")
async def send_email(background_tasks: BackgroundTasks):
    background_tasks.add_task(write_log, "Email sent successfully!")
    return {"message": "Email has been sent in the background."}

ব্যাখ্যা:

  • BackgroundTasks: এটি FastAPI এর এক্সটেনশন যা ব্যাকগ্রাউন্ড টাস্ক পরিচালনা করতে সাহায্য করে।
  • add_task(): ব্যাকগ্রাউন্ডে যেকোনো ফাংশন (এখানে write_log) চালানোর জন্য এটি ব্যবহার করা হয়।

রিকোয়েস্ট উদাহরণ:

GET /send-email/

রেসপন্স:

{
  "message": "Email has been sent in the background."
}

এখানে, write_log ফাংশনটি ব্যাকগ্রাউন্ডে কাজ করবে, এবং ব্যবহারকারী রেসপন্স পাবে কিন্তু কাজটি সম্পন্ন হতে কিছু সময় নিবে।


2. Celery এর মাধ্যমে Distributed Task Queue

Celery একটি distributed task queue ব্যবস্থা যা FastAPI এর সাথে খুব সহজেই ইন্টিগ্রেট করা যায়। Celery ব্যবহার করে আপনি অ্যাসিঙ্ক্রোনাস বা ব্যাকগ্রাউন্ড টাস্ক পরিচালনা করতে পারেন, যা স্কেলেবল এবং ডিস্ট্রিবিউটেড।

Step 1: Celery ইনস্টল করা

Celery এবং Redis বা RabbitMQ (message broker) ইনস্টল করতে হবে।

pip install celery redis

এখানে, আমরা Redis ব্যবহার করব যেহেতু এটি জনপ্রিয় message broker হিসেবে ব্যবহৃত হয়।

Step 2: Celery অ্যাপ কনফিগারেশন

FastAPI অ্যাপ্লিকেশনে Celery যোগ করার জন্য আপনাকে একটি Celery অ্যাপ কনফিগার করতে হবে।

# celery_app.py
from celery import Celery

# Redis broker ব্যবহার করে Celery অ্যাপ তৈরি
celery_app = Celery('tasks', broker='redis://localhost:6379/0')

@celery_app.task
def send_email_task(email: str):
    # দীর্ঘ সময়ের কাজ যেমন ইমেইল পাঠানো
    time.sleep(5)
    return f"Email sent to {email}"

এখানে:

  • Celery('tasks', broker='redis://localhost:6379/0'): Celery অ্যাপ এবং Redis ব্রোকার কনফিগার করা হচ্ছে।
  • send_email_task: একটি Celery টাস্ক যা ব্যাকগ্রাউন্ডে ইমেইল পাঠানোর মতো কাজ করবে।

Step 3: FastAPI তে Celery ইন্টিগ্রেট করা

from fastapi import FastAPI
from celery_app import send_email_task

app = FastAPI()

@app.get("/send-email/{email}")
async def send_email(email: str):
    send_email_task.delay(email)  # Celery টাস্কে কল করা
    return {"message": "Email is being sent in the background."}

ব্যাখ্যা:

  • send_email_task.delay(email): এটি Celery টাস্ককে asynchronous ভাবে রান করতে সাহায্য করে।
  • delay(): Celery টাস্ক রান করার জন্য এটি asynchronous কল ব্যবহার করা হয়।

Step 4: Celery Worker রান করা

Celery worker রান করার জন্য আপনাকে আলাদা কমান্ডে Celery চালাতে হবে:

celery -A celery_app.celery_app worker --loglevel=info

এটি Redis সার্ভারের মাধ্যমে কাজ করবে এবং বিভিন্ন টাস্ক প্রসেস করবে।

Step 5: FastAPI অ্যাপ চালানো

FastAPI অ্যাপ চালানোর জন্য:

uvicorn main:app --reload

রিকোয়েস্ট উদাহরণ:

GET /send-email/test@example.com

রেসপন্স:

{
  "message": "Email is being sent in the background."
}

এই কমান্ডে, send_email_task টাস্কটি Redis তে প্রেরিত হবে এবং Celery worker তা প্রসেস করবে, এদিকে FastAPI অ্যাপ্লিকেশন ক্লায়েন্টকে দ্রুত রেসপন্স প্রদান করবে।


3. Celery এবং FastAPI তে Error Handling এবং Retries

Celery তে টাস্কের জন্য error handling এবং retries সেটআপ করা যায়। উদাহরণস্বরূপ:

@celery_app.task(bind=True, max_retries=3)
def send_email_task(self, email: str):
    try:
        # ইমেইল পাঠানোর কাজ
        # simulate error
        if email == "error@example.com":
            raise ValueError("Simulated error")
        return f"Email sent to {email}"
    except Exception as exc:
        raise self.retry(exc=exc, countdown=5)

এখানে:

  • max_retries=3: টাস্কটি সর্বাধিক ৩ বার পুনরায় চেষ্টা করবে।
  • retry(): যদি কোনো ত্রুটি ঘটে, তাহলে টাস্কটি পুনরায় চেষ্টা করবে।

FastAPI তে Background Tasks এবং Celery ব্যবহার করে asynchronous এবং distributed task queue ব্যবস্থা তৈরি করা সহজ এবং কার্যকর। Background Tasks ছোট ও সহজ কাজের জন্য এবং Celery বড় এবং স্কেলেবল অ্যাসিঙ্ক্রোনাস কাজের জন্য ব্যবহৃত হয়। এই সিস্টেম ব্যবহারের মাধ্যমে আপনি অ্যাপ্লিকেশনকে আরও স্কেলেবল এবং পারফরম্যান্ট তৈরি করতে পারেন। Celery সহ ডিস্ট্রিবিউটেড টাস্ক কিউ ব্যবস্থাপনা উন্নত পারফরম্যান্স এবং সুরক্ষা প্রদান করে।

Content added By
Promotion
NEW SATT AI এখন আপনাকে সাহায্য করতে পারে।

Are you sure to start over?

Loading...