Flask-এ Celery ব্যবহার করা হয় asynchronous task processing বা background task processing পরিচালনা করার জন্য। Celery একটি অত্যন্ত শক্তিশালী এবং জনপ্রিয় টুল, যা Flask অ্যাপ্লিকেশনে ব্যাকগ্রাউন্ড টাস্ক পরিচালনা করতে সাহায্য করে, যেমন ইমেইল পাঠানো, ফাইল প্রসেসিং, বা বড় ডেটা প্রসেসিং।
Celery সাধারণত Task Queue ম্যানেজমেন্টের জন্য ব্যবহৃত হয়, যেখানে টাস্কগুলো একটি Queue তে রাখা হয় এবং worker দ্বারা প্রসেস করা হয়। এটি Flask অ্যাপ্লিকেশনে ফিচারগুলোর কার্যকারিতা উন্নত করতে সাহায্য করে, যেমন দীর্ঘ সময় নেওয়া কাজগুলিকে প্রধান থ্রেডে ব্লক না করে ব্যাকগ্রাউন্ডে চালানো।
এই টিউটোরিয়ালে আমরা Flask অ্যাপে Celery ব্যবহার করে Task Queue Management কিভাবে করতে হয় তা দেখব।
১. Celery সেটআপ
Celery কাজ করার জন্য কিছু নির্দিষ্ট কনফিগারেশন এবং ব্যবস্থাপনা প্রয়োজন। সাধারণত এটি একটি message broker যেমন RabbitMQ বা Redis ব্যবহার করে।
ধাপ ১: Celery ইনস্টলেশন
প্রথমে Flask এবং Celery ইনস্টল করতে হবে। আপনি pip ব্যবহার করে Flask এবং Celery ইনস্টল করতে পারেন:
pip install flask
pip install celery
pip install redis # Redis সার্ভার ব্যবহার করতে হলে
Redis সার্ভার ব্যবহারের জন্য, আপনি আপনার লোকাল মেশিনে Redis ইনস্টল করতে হবে। অথবা, Docker ব্যবহার করে Redis চালু করতে পারেন।
docker run -p 6379:6379 redis
ধাপ ২: Flask অ্যাপ্লিকেশনে Celery ইন্টিগ্রেশন
Flask অ্যাপ্লিকেশনে Celery কনফিগার করতে Celery এবং Redis এর কনফিগারেশন যোগ করা হয়।
from flask import Flask, jsonify
from celery import Celery
app = Flask(__name__)
# Celery কনফিগারেশন
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0' # Redis URL
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
# Celery ইনস্ট্যান্স তৈরি করা
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@app.route('/process_task')
def process_task():
task = long_task.apply_async() # ব্যাকগ্রাউন্ড টাস্ক চালানো
return jsonify({"task_id": task.id}), 202
@celery.task
def long_task():
import time
time.sleep(10) # 10 সেকেন্ড স্লিপ করে দীর্ঘ সময় নেওয়া কাজ
return 'Task completed!'
if __name__ == "__main__":
app.run(debug=True)
এখানে:
CELERY_BROKER_URL: এটি Redis সার্ভারের URL।CELERY_RESULT_BACKEND: এটি যেখানে টাস্কের ফলাফল সংরক্ষণ করবে, সাধারণত Redis বা অন্য ডেটাবেস ব্যবহার করা হয়।long_task(): এটি একটি Celery task যা ব্যাকগ্রাউন্ডে রান করবে।
২. Task Queue Management
Celery অ্যাপ্লিকেশনটি মূলত টাস্কগুলো কিউতে রেখে সেগুলোকে প্রসেস করতে সাহায্য করে। যখন একটি টাস্ক সাবমিট করা হয়, তখন Celery সেই টাস্ককে কিউতে রাখে এবং worker দ্বারা প্রসেস করা হয়। এটি অ্যাসিঙ্ক্রোনাসভাবে কাজ করে, অর্থাৎ Flask অ্যাপ্লিকেশন ব্যাকগ্রাউন্ডে টাস্ক প্রক্রিয়াকরণের জন্য থামবে না।
উদাহরণ:
আপনি /process_task URL এ গিয়ে long_task টাস্কটিকে asynchronous ভাবে শুরু করতে পারেন।
curl http://127.0.0.1:5000/process_task
এটি একটি HTTP রিকোয়েস্ট পাঠাবে এবং Celery টাস্কটি Redis কিউতে রেখে দিতে হবে।
৩. Celery Worker চালানো
Celery-তে টাস্ক প্রসেস করার জন্য একটি worker চালানো দরকার। এটি ফ্লাস্ক অ্যাপের বাইরে চলে এবং Redis কিউ থেকে টাস্কগুলি প্রসেস করে।
Celery worker চালাতে:
celery -A app.celery worker
এখানে:
-A app.celery: এটি Flask অ্যাপেরceleryইনস্ট্যান্সকে নির্দেশ দেয়।worker: Celery worker প্রসেস শুরু করে, যা কিউ থেকে টাস্ক নেবে এবং প্রসেস করবে।
এখন, যদি আপনি /process_task URL এ একটি রিকোয়েস্ট পাঠান, তাহলে Celery worker এই টাস্কটিকে Redis কিউ থেকে নিয়ে প্রসেস করবে।
৪. Celery Result Retrieval
Celery-তে আপনি একটি টাস্কের ফলাফল সংগ্রহ করতে পারেন। এটি AsyncResult ব্যবহার করে করা হয়, যার মাধ্যমে আপনি টাস্কের স্ট্যাটাস এবং ফলাফল জানতে পারবেন।
@app.route('/task_status/<task_id>')
def task_status(task_id):
task = long_task.AsyncResult(task_id)
if task.state == 'PENDING':
return jsonify({"status": "Task is pending"})
elif task.state != 'FAILURE':
return jsonify({"status": task.state, "result": task.result})
else:
return jsonify({"status": "Task failed"})
এখানে:
AsyncResult: এটি টাস্কের বর্তমান স্ট্যাটাস এবং ফলাফল ফেচ করতে ব্যবহৃত হয়।
৫. Flask এবং Celery এর মাধ্যমে কনক্রিট টাস্কের উদাহরণ
ধরা যাক, আপনি একটি বড় ডেটা প্রক্রিয়া চালাতে চান যা দীর্ঘ সময় নেয়, যেমন ইমেইল পাঠানো বা রিপোর্ট জেনারেশন। আপনি সেই কাজটি Celery দিয়ে ব্যাকগ্রাউন্ডে চালিয়ে দিতে পারেন, যাতে Flask অ্যাপ্লিকেশন থামবে না এবং ব্যবহারকারীকে দ্রুত সাড়া দেওয়া যায়।
উদাহরণ:
from flask import Flask, render_template
from celery import Celery
import time
app = Flask(__name__)
# Celery কনফিগারেশন
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/start_task')
def start_task():
task = background_task.apply_async() # টাস্ক শুরু
return f'Task started with id {task.id}'
@celery.task
def background_task():
time.sleep(30) # 30 সেকেন্ডের জন্য স্লিপ করা
return 'Task completed'
if __name__ == "__main__":
app.run(debug=True)
এখানে:
background_task(): এই টাস্কটি Celery worker দ্বারা 30 সেকেন্ডের জন্য প্রসেস হবে।apply_async(): এটি টাস্কটিকে asynchronously 실행 করার জন্য ব্যবহৃত হয়।
Flask এবং Celery এর সংমিশ্রণ ওয়েব অ্যাপ্লিকেশনগুলোতে background task management বা asynchronous task processing পরিচালনা করার জন্য খুবই শক্তিশালী এবং কার্যকরী। Celery দিয়ে আপনি দীর্ঘ সময় নেওয়া কাজগুলোকে ব্যাকগ্রাউন্ডে রাখতে পারেন, যেমন ইমেইল পাঠানো, রিপোর্ট জেনারেশন, বা বড় ডেটা প্রসেসিং। Celery worker এবং Redis ব্যবহারের মাধ্যমে আপনার অ্যাপ্লিকেশন দ্রুত এবং স্কেলেবল হয়, যা ব্যবহারকারীদের জন্য উন্নত অভিজ্ঞতা নিশ্চিত করে।