Observable হলো RxJava-এর একটি প্রধান কম্পোনেন্ট, যা data emitter (ডেটা প্রেরণকারী) হিসেবে কাজ করে। এটি ডেটা বা ইভেন্টের একটি ধারাবাহিক স্ট্রিম প্রদান করে। Observer (ডেটা গ্রহণকারী) এই স্ট্রিম থেকে ডেটা সংগ্রহ করে এবং সেটির উপর কাজ করে।
Observable মূলত Publisher এর কাজ করে এবং Reactive Programming এর Publish-Subscribe Pattern অনুসরণ করে।
Observable কিভাবে কাজ করে?
Observable-এর কাজের প্রধান ধাপগুলো নিম্নরূপ:
- Create:
Observable ডেটা স্ট্রিম তৈরি করে। এটি ডেটা emit (প্রেরণ) করার জন্য দায়ী। - Subscribe:
একটি Observer বা Subscriber Observable-এ subscribe করে। এটি ডেটা বা ইভেন্ট গ্রহণ করার জন্য প্রস্তুত হয়। - Emit:
Observable বিভিন্ন ধরণের ডেটা emit করে (যেমন, single value, multiple values, বা error)। এটি তিনটি প্রকারের notification পাঠাতে পারে:- onNext(): প্রতিটি ডেটা আইটেম পাঠানোর জন্য।
- onError(): কোনো ত্রুটি হলে।
- onComplete(): ডেটা স্ট্রিম শেষ হলে।
- Dispose:
Subscription বন্ধ করার জন্য dispose() ব্যবহার করা হয়। এটি মেমোরি লিক এড়াতে সাহায্য করে।
Observable-এর উদাহরণ
1. Simple Observable Example:
import io.reactivex.Observable;
public class ObservableExample {
public static void main(String[] args) {
// Observable তৈরি
Observable<String> observable = Observable.just("Hello", "RxJava", "World");
// Observer তৈরি এবং Subscribe করা
observable.subscribe(
item -> System.out.println("Received: " + item), // onNext
error -> System.err.println("Error: " + error), // onError
() -> System.out.println("Completed!") // onComplete
);
}
}
আউটপুট:
Received: Hello
Received: RxJava
Received: World
Completed!
2. Observable এবং Operators:
import io.reactivex.Observable;
public class ObservableWithOperators {
public static void main(String[] args) {
Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5);
observable
.map(number -> number * 2) // প্রতিটি সংখ্যাকে ২ দিয়ে গুন
.filter(number -> number > 5) // শুধুমাত্র > 5 এর সংখ্যা নির্বাচন
.subscribe(
item -> System.out.println("Processed: " + item), // onNext
error -> System.err.println("Error: " + error), // onError
() -> System.out.println("Stream Completed!") // onComplete
);
}
}
আউটপুট:
Processed: 6
Processed: 8
Processed: 10
Stream Completed!
3. Observable থেকে Error Handling:
import io.reactivex.Observable;
public class ObservableErrorHandling {
public static void main(String[] args) {
Observable<Integer> observable = Observable.create(emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Exception("Something went wrong!"));
emitter.onNext(3); // এটি আর পাঠানো হবে না
emitter.onComplete();
});
observable.subscribe(
item -> System.out.println("Received: " + item),
error -> System.err.println("Error: " + error.getMessage()), // Error Handling
() -> System.out.println("Completed!")
);
}
}
আউটপুট:
Received: 1
Received: 2
Error: Something went wrong!
Observable তৈরি করার পদ্ধতি
RxJava-তে Observable তৈরির বিভিন্ন উপায় রয়েছে:
just(): এক বা একাধিক নির্দিষ্ট ডেটা emit করে।
Observable<Integer> observable = Observable.just(1, 2, 3);fromIterable(): Iterable collection থেকে ডেটা emit করে।
List<String> items = Arrays.asList("A", "B", "C"); Observable<String> observable = Observable.fromIterable(items);create(): Custom ডেটা স্ট্রিম তৈরি করতে।
Observable<Integer> observable = Observable.create(emitter -> { emitter.onNext(1); emitter.onNext(2); emitter.onComplete(); });range(): একটি নির্দিষ্ট সীমার মধ্যে সংখ্যা emit করে।
Observable<Integer> observable = Observable.range(1, 5);interval(): নির্দিষ্ট সময়ের বিরতিতে ডেটা emit করে (সাধারণত asynchronous কাজের জন্য ব্যবহৃত)।
Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS);
Observable-এর সুবিধা
- Asynchronous এবং Event-driven Programming সহজ করে।
- Functional এবং Declarative কোডিং স্টাইল।
- Operators-এর মাধ্যমে সহজ ডেটা প্রক্রিয়াকরণ।
- Thread Management-এর জন্য Schedulers ব্যবহার।
Observable এর সীমাবদ্ধতা
- শিখতে কিছুটা সময় লাগে।
- Debugging অনেক সময় জটিল হয়ে যায়।
- Subscription সঠিকভাবে dispose() না করলে মেমোরি লিক হতে পারে।
RxJava-তে Observable একটি গুরুত্বপূর্ণ ভূমিকা পালন করে এবং এটি asynchronous programming আরও কার্যকর ও সহজতর করে।
Read more