MongoDB তে Change Stream একটি শক্তিশালী ফিচার যা ডেটাবেসে পরিবর্তন (insert, update, delete) হওয়া যেকোনো ইভেন্ট রিয়েল-টাইমে ট্র্যাক করার জন্য ব্যবহৃত হয়। Change Stream API MongoDB তে ডেটাবেসের উপর পরিবর্তনগুলি সবার আগে পেতে সাহায্য করে, এবং এটি MongoDB 3.6 এর পর থেকে সমর্থিত।
Change Stream API ব্যবহার করে MongoDB তে real-time data monitoring এবং trigger-based workflows তৈরি করা যেতে পারে। এটি বিশেষভাবে useful যখন আপনি অ্যাপ্লিকেশনে কোনো ডেটার পরিবর্তন মনিটর করতে চান।
Change Stream API এর মূল ধারণা
- Change Stream MongoDB তে ডেটাবেস বা কালেকশনের মধ্যে কোন পরিবর্তন ঘটলে একটি "stream" বা স্রোত তৈরি করে, যা পরিবর্তনগুলিকে রিয়েল-টাইমে পাঠায়।
- এটি ডেটাবেসের জন্য কার্যকরী, বিশেষত যখন অ্যাপ্লিকেশনকে ডেটা আপডেট বা পরিবর্তন পাওয়ার সাথে সাথে তা রিয়েল-টাইমে আপডেট করতে হয়।
1. Change Stream API কনফিগারেশন
MongoDB তে Change Stream ব্যবহার করতে হলে, প্রথমে আপনাকে একটি Replica Set তৈরি করতে হবে, কারণ Change Stream শুধুমাত্র Replica Set এ কাজ করে, একক MongoDB ইনস্ট্যান্সে নয়।
Replica Set Configuration Example
MongoDB Replica Set কনফিগার করার জন্য প্রথমে সার্ভার কনফিগারেশন করতে হবে:
mongod --replSet "rs0" --port 27017 --dbpath /data/db
এরপর MongoDB Replica Set ইনিশিয়ালাইজ করতে:
rs.initiate()
2. Change Stream শুরু করা
MongoDB তে Change Stream শুরু করতে, প্রথমে আপনাকে MongoCollection বা MongoDatabase এর উপর watch() মেথড ব্যবহার করতে হবে। এটি একটি stream তৈরি করবে এবং পরিবর্তনগুলো তাতে পাঠাবে।
Change Stream ব্যবহার করে ডেটাবেসের পরিবর্তন ট্র্যাক করা
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import org.bson.Document;
public class MongoDBChangeStream {
public static void main(String[] args) {
// MongoDB ক্লায়েন্ট তৈরি করা
MongoClient mongoClient = new MongoClient("localhost", 27017);
// ডেটাবেস এবং কালেকশন নির্বাচন করা
MongoDatabase database = mongoClient.getDatabase("myDatabase");
MongoCollection<Document> collection = database.getCollection("myCollection");
// Change Stream শুরু করা
collection.watch().forEach(change -> {
System.out.println("Change detected: " + change.getFullDocument());
});
// MongoDB ক্লায়েন্ট বন্ধ করা
mongoClient.close();
}
}
এখানে, watch() মেথড ব্যবহার করে myCollection কালেকশনের উপর Change Stream ট্র্যাক করা হয়েছে। যখনই কোন পরিবর্তন হবে, তা রিয়েল-টাইমে কনসোলে দেখানো হবে।
3. Filtering Change Streams
MongoDB তে আপনি Change Stream এ কিছু ফিল্টারিং শর্ত প্রয়োগ করতে পারেন। উদাহরণস্বরূপ, আপনি যদি শুধু ডকুমেন্ট ইনসার্টের পরিবর্তন ট্র্যাক করতে চান, তবে তা এইভাবে করতে পারেন:
collection.watch(Arrays.asList(
Aggregates.match(Filters.eq("operationType", "insert"))
)).forEach(change -> {
System.out.println("Insert detected: " + change.getFullDocument());
});
এখানে, Filters.eq("operationType", "insert") শর্তে insert অপারেশন ফিল্টার করা হয়েছে, যার মাধ্যমে শুধু ইনসার্ট অপারেশনগুলো ট্র্যাক করা হবে।
4. Change Stream Event Types
MongoDB Change Stream এ বিভিন্ন ধরনের ইভেন্ট (অপারেশন) থাকে:
- insert: নতুন ডকুমেন্ট ইনসার্ট হওয়ার সময়।
- update: ডকুমেন্টে কোনো পরিবর্তন হওয়া (update)।
- replace: ডকুমেন্ট সম্পূর্ণরূপে প্রতিস্থাপিত হওয়া (replace).
- delete: ডকুমেন্ট মুছে ফেলা (delete).
- invalidate: কোনো ট্র্যাকিং অবস্থার পরিবর্তন হওয়ার সময়।
Example: Handling Different Types of Events
collection.watch().forEach(change -> {
String operationType = change.getOperationType().getValue();
switch (operationType) {
case "insert":
System.out.println("Document Inserted: " + change.getFullDocument());
break;
case "update":
System.out.println("Document Updated: " + change.getUpdateDescription());
break;
case "delete":
System.out.println("Document Deleted: " + change.getDocumentKey());
break;
default:
System.out.println("Other operation: " + operationType);
}
});
এখানে, পরিবর্তনগুলোর ধরন অনুযায়ী (insert, update, delete) আলাদা আলাদা লজিক কার্যকর করা হয়েছে।
5. Resume Tokens
MongoDB Change Streams resume token প্রদান করে, যা আপনাকে স্ট্রীম পুনরায় শুরু করতে সাহায্য করে, যদি কোনো কারণে স্ট্রীম বন্ধ হয়ে যায়। এটি Change Stream তে একটি নির্দিষ্ট অবস্থান থেকে পুনরায় শুরু করার জন্য ব্যবহৃত হয়।
Resume Token Example
ChangeStreamDocument<Document> change = collection.watch().first();
Object resumeToken = change.getResumeToken();
এটি ব্যবহার করে আপনি resume token দ্বারা MongoDB তে Change Stream পুনরায় শুরু করতে পারবেন।
6. Error Handling
Change Stream ব্যবহারের সময় কিছু ত্রুটি হতে পারে, যেমন:
- Network Issues: যদি MongoDB সার্ভারের সাথে সংযোগ বিচ্ছিন্ন হয়ে যায়, তাহলে Change Stream পুনরায় শুরু করতে হবে।
- Resume Failures: কিছু সময় স্ট্রীম পুনরায় শুরু করতে সমস্যার সম্মুখীন হতে পারেন।
Error Handling Example
try {
collection.watch().forEach(change -> {
// Process the change
});
} catch (Exception e) {
System.out.println("Error in Change Stream: " + e.getMessage());
}
7. Change Stream with Aggregation Pipelines
MongoDB Change Streams এ Aggregation Pipelines ব্যবহার করা যায়, যা আপনাকে আরও ফিল্টার এবং কাস্টম অপারেশন করতে সহায়তা করে। উদাহরণস্বরূপ, আপনি ইনসার্ট হওয়া ডকুমেন্টগুলোকে নির্দিষ্ট শর্তে ফিল্টার করতে পারেন।
Aggregation Pipeline Example
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.eq("operationType", "insert")),
Aggregates.project(Projections.include("fullDocument"))
);
collection.watch(pipeline).forEach(change -> {
System.out.println("Inserted Document: " + change.getFullDocument());
});
এখানে, Change Stream এ শুধুমাত্র insert অপারেশনগুলো ফিল্টার করা হয়েছে এবং fullDocument প্রজেক্ট করা হয়েছে।
সারাংশ
MongoDB তে Change Stream API ডেটাবেসের মধ্যে রিয়েল-টাইম পরিবর্তন ট্র্যাক করার জন্য একটি শক্তিশালী টুল। এটি ইনসার্ট, আপডেট, ডিলিট এবং অন্যান্য পরিবর্তনগুলিকে মনিটর করতে সাহায্য করে। Java তে MongoDB Change Stream ব্যবহার করে আপনি ডেটা পরিবর্তন হওয়ার সাথে সাথে প্রক্রিয়া শুরু করতে পারেন এবং অ্যাপ্লিকেশনকে রিয়েল-টাইম আপডেট করতে সক্ষম হবেন। Change Stream API MongoDB তে কর্মক্ষমতা, ডেটা ট্রিগার এবং রিয়েল-টাইম ফিচারগুলির জন্য গুরুত্বপূর্ণ একটি বৈশিষ্ট্য।