Flowable RxJava এর একটি কম্পোনেন্ট, যা Reactive Streams specification মেনে চলে এবং backpressure সমর্থন করে। এটি Observable এর মতোই কাজ করে, তবে বড় ডেটা স্ট্রিম বা দ্রুতগতিতে emitted ডেটার ক্ষেত্রে আরও কার্যকর।
Backpressure: যখন ডেটা প্রোডিউসার (Observable) বেশি গতিতে ডেটা emit করে, কিন্তু কনজিউমার (Observer) সেই ডেটা প্রক্রিয়া করতে ধীর হয়, তখন সেই পরিস্থিতি হ্যান্ডেল করাকে backpressure handling বলা হয়। Flowable এই সমস্যা সমাধানে ডিজাইন করা হয়েছে।
Flowable কেন ব্যবহার করা হয়?
- Backpressure Management:
যদি ডেটা স্ট্রিম খুব দ্রুতগতিতে ডেটা emit করে, তাহলে Flowable সেই ডেটা কীভাবে manage করবে তা নিয়ন্ত্রণ করে।
উদাহরণ: Real-time sensor data, log streams। - High-Volume Data Streams:
বড় ডেটা স্ট্রিম বা high-frequency asynchronous events handle করার জন্য। - Memory Overhead কমানো:
Backpressure strategy ব্যবহার করে অতিরিক্ত ডেটা memory তে জমা না করে discard, buffer, বা অন্য কোনো উপায়ে manage করা যায়।
Flowable ব্যবহার করার পদ্ধতি
Flowable তৈরি করার দুটি উপায় রয়েছে:
- Flowable.create():
যখন আপনাকে custom logic সহ একটি Flowable তৈরি করতে হয়। - Flowable.fromXXX():
যখন Observable বা Iterable থেকে Flowable তৈরি করতে হয়।
উদাহরণ ১: Flowable ব্যবহার
import io.reactivex.Flowable;
public class FlowableExample {
public static void main(String[] args) {
Flowable<Integer> flowable = Flowable.range(1, 1000) // Emits 1000 integers
.onBackpressureDrop(); // Drops excess data if backpressure occurs
flowable.subscribe(
item -> {
// Simulating slow processing
Thread.sleep(10);
System.out.println("Received: " + item);
},
throwable -> System.err.println("Error: " + throwable),
() -> System.out.println("Completed!")
);
}
}
Flowable এর Backpressure Strategy
Flowable-এ কয়েকটি built-in backpressure strategy রয়েছে:
Buffer:
সমস্ত ডেটা একটি buffer-এ জমা করে, যতক্ষণ না কনজিউমার সেই ডেটা প্রক্রিয়া করে।Flowable<Integer> flowable = Flowable.range(1, 1000) .onBackpressureBuffer();Drop:
Backpressure হলে অতিরিক্ত ডেটা discard করে।Flowable<Integer> flowable = Flowable.range(1, 1000) .onBackpressureDrop();Latest:
কনজিউমার যখন নতুন ডেটা প্রসেস করার জন্য প্রস্তুত হয়, তখন শুধুমাত্র সর্বশেষ emitted ডেটা প্রেরণ করে।Flowable<Integer> flowable = Flowable.range(1, 1000) .onBackpressureLatest();Error:
Backpressure হলে error throw করে।Flowable<Integer> flowable = Flowable.range(1, 1000) .onBackpressureError();
উদাহরণ ২: Observable থেকে Flowable
import io.reactivex.Observable;
import io.reactivex.Flowable;
public class ObservableToFlowable {
public static void main(String[] args) {
Observable<Integer> observable = Observable.range(1, 1000);
Flowable<Integer> flowable = observable.toFlowable(BackpressureStrategy.BUFFER);
flowable.subscribe(
item -> System.out.println("Received: " + item),
throwable -> System.err.println("Error: " + throwable)
);
}
}
Flowable এর ব্যবহার ক্ষেত্র
- Sensor Data Processing:
High-frequency sensor data manage করতে। - Logging Systems:
দ্রুতগতির log streams পরিচালনা করতে। - Video Streaming:
High-volume multimedia data stream পরিচালনা করতে। - Network Requests:
API responses এর ক্ষেত্রে backpressure হ্যান্ডল করতে।
Observable বনাম Flowable
| Observable | Flowable |
|---|---|
| Backpressure support নেই। | Backpressure handle করতে পারে। |
| ছোট ডেটা স্ট্রিমের জন্য উপযুক্ত। | বড় ডেটা স্ট্রিম বা high-frequency data এর জন্য উপযুক্ত। |
| Simple applications-এ ব্যবহার হয়। | Complex এবং resource-intensive applications-এ ব্যবহৃত হয়। |
সংক্ষেপে:
Flowable একটি শক্তিশালী কম্পোনেন্ট, যা high-volume এবং fast-paced ডেটা স্ট্রিমকে handle করতে সক্ষম। এর built-in backpressure strategies asynchronous programming এবং large-scale applications-এ কার্যকরভাবে ব্যবহৃত হয়।
Read more