RxJava-তে Observable এবং Observer হল দুটি মূল উপাদান, যা Reactive Programming-এর মডেলের ভিত্তি গঠন করে। এগুলি ডেটা স্ট্রিম তৈরি ও প্রক্রিয়াকরণ করতে ব্যবহৃত হয়।
Observable:
Observable একটি ডেটা স্ট্রিম উৎপন্ন করে, যা এক বা একাধিক Observer-কে ডেটা সরবরাহ করে। এটি ডেটা "প্রকাশকারী" (Publisher) হিসেবে কাজ করে।
Observable-এর ভূমিকা:
- ডেটা উৎপন্ন করে।
- Observer-কে সেই ডেটা প্রদান করে।
- প্রক্রিয়ার শেষে "complete" বা "error" সংকেত দেয়।
Observable তৈরি করার পদ্ধতি:
just(): একটি নির্দিষ্ট ডেটা প্রদান করে।
Observable<String> observable = Observable.just("Item 1", "Item 2", "Item 3");fromArray(): একটি array থেকে Observable তৈরি করে।
Observable<Integer> observable = Observable.fromArray(1, 2, 3, 4, 5);create(): কাস্টম Observable তৈরি করতে।
Observable<String> observable = Observable.create(emitter -> { emitter.onNext("Hello"); emitter.onNext("RxJava"); emitter.onComplete(); });
Observer:
Observer হল সেই উপাদান, যা Observable-এর ডেটা সাবস্ক্রাইব করে এবং সেই ডেটা প্রক্রিয়া করে। এটি ডেটা "গ্রাহক" (Subscriber) হিসেবে কাজ করে।
Observer-এর ভূমিকা:
- Observable-এর ডেটা গ্রহণ করে।
- প্রাপ্ত ডেটার উপর প্রক্রিয়াকরণ সম্পন্ন করে।
- Observable থেকে "complete" বা "error" সংকেতের প্রতিক্রিয়া জানায়।
Observer তৈরি করার পদ্ধতি:
Anonymous Class ব্যবহার করে:
Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println("Subscribed"); } @Override public void onNext(String item) { System.out.println("Received: " + item); } @Override public void onError(Throwable e) { System.err.println("Error: " + e.getMessage()); } @Override public void onComplete() { System.out.println("Done!"); } };Lambda Expressions ব্যবহার করে:
সহজ কোডিং স্টাইল।observable.subscribe( item -> System.out.println("Received: " + item), // onNext error -> System.err.println("Error: " + error), // onError () -> System.out.println("Done!") // onComplete );
Observable এবং Observer-এর Workflow:
- Observable ডেটা স্ট্রিম তৈরি করে।
- Observer সেই Observable-এ সাবস্ক্রাইব করে।
- Observable ডেটা (onNext), error (onError), এবং শেষের সংকেত (onComplete) প্রদান করে।
উদাহরণ:
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
public class RxJavaExample {
public static void main(String[] args) {
// Observable তৈরি করা
Observable<String> observable = Observable.just("Hello", "RxJava");
// Observer তৈরি করা
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("Subscribed to Observable");
}
@Override
public void onNext(String item) {
System.out.println("Received: " + item);
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("All data received!");
}
};
// Observable-এ Observer সাবস্ক্রাইব করা
observable.subscribe(observer);
}
}
Key Points:
- Observable ডেটা তৈরি ও প্রেরণ করে।
- Observer সেই ডেটা গ্রহণ ও প্রক্রিয়া করে।
onNext(),onError(), এবংonComplete()মেথডগুলো ব্যবহার করে ডেটা এবং অবস্থা পর্যবেক্ষণ করা হয়।- Observable এবং Observer-এর মধ্যে সম্পর্ক subscribe() মেথডের মাধ্যমে স্থাপন করা হয়।
Reactive Programming-এর ভিত্তি এই ধারণাগুলির উপর নির্ভরশীল। এগুলি বুঝে নিলে RxJava ব্যবহার অনেক সহজ হয়ে যায়।
Observable হলো RxJava-এর একটি প্রধান কম্পোনেন্ট, যা data emitter (ডেটা প্রেরণকারী) হিসেবে কাজ করে। এটি ডেটা বা ইভেন্টের একটি ধারাবাহিক স্ট্রিম প্রদান করে। Observer (ডেটা গ্রহণকারী) এই স্ট্রিম থেকে ডেটা সংগ্রহ করে এবং সেটির উপর কাজ করে।
Observable মূলত Publisher এর কাজ করে এবং Reactive Programming এর Publish-Subscribe Pattern অনুসরণ করে।
Observable কিভাবে কাজ করে?
Observable-এর কাজের প্রধান ধাপগুলো নিম্নরূপ:
- Create:
Observable ডেটা স্ট্রিম তৈরি করে। এটি ডেটা emit (প্রেরণ) করার জন্য দায়ী। - Subscribe:
একটি Observer বা Subscriber Observable-এ subscribe করে। এটি ডেটা বা ইভেন্ট গ্রহণ করার জন্য প্রস্তুত হয়। - Emit:
Observable বিভিন্ন ধরণের ডেটা emit করে (যেমন, single value, multiple values, বা error)। এটি তিনটি প্রকারের notification পাঠাতে পারে:- onNext(): প্রতিটি ডেটা আইটেম পাঠানোর জন্য।
- onError(): কোনো ত্রুটি হলে।
- onComplete(): ডেটা স্ট্রিম শেষ হলে।
- Dispose:
Subscription বন্ধ করার জন্য dispose() ব্যবহার করা হয়। এটি মেমোরি লিক এড়াতে সাহায্য করে।
Observable-এর উদাহরণ
1. Simple Observable Example:
import io.reactivex.Observable;
public class ObservableExample {
public static void main(String[] args) {
// Observable তৈরি
Observable<String> observable = Observable.just("Hello", "RxJava", "World");
// Observer তৈরি এবং Subscribe করা
observable.subscribe(
item -> System.out.println("Received: " + item), // onNext
error -> System.err.println("Error: " + error), // onError
() -> System.out.println("Completed!") // onComplete
);
}
}
আউটপুট:
Received: Hello
Received: RxJava
Received: World
Completed!
2. Observable এবং Operators:
import io.reactivex.Observable;
public class ObservableWithOperators {
public static void main(String[] args) {
Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5);
observable
.map(number -> number * 2) // প্রতিটি সংখ্যাকে ২ দিয়ে গুন
.filter(number -> number > 5) // শুধুমাত্র > 5 এর সংখ্যা নির্বাচন
.subscribe(
item -> System.out.println("Processed: " + item), // onNext
error -> System.err.println("Error: " + error), // onError
() -> System.out.println("Stream Completed!") // onComplete
);
}
}
আউটপুট:
Processed: 6
Processed: 8
Processed: 10
Stream Completed!
3. Observable থেকে Error Handling:
import io.reactivex.Observable;
public class ObservableErrorHandling {
public static void main(String[] args) {
Observable<Integer> observable = Observable.create(emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Exception("Something went wrong!"));
emitter.onNext(3); // এটি আর পাঠানো হবে না
emitter.onComplete();
});
observable.subscribe(
item -> System.out.println("Received: " + item),
error -> System.err.println("Error: " + error.getMessage()), // Error Handling
() -> System.out.println("Completed!")
);
}
}
আউটপুট:
Received: 1
Received: 2
Error: Something went wrong!
Observable তৈরি করার পদ্ধতি
RxJava-তে Observable তৈরির বিভিন্ন উপায় রয়েছে:
just(): এক বা একাধিক নির্দিষ্ট ডেটা emit করে।
Observable<Integer> observable = Observable.just(1, 2, 3);fromIterable(): Iterable collection থেকে ডেটা emit করে।
List<String> items = Arrays.asList("A", "B", "C"); Observable<String> observable = Observable.fromIterable(items);create(): Custom ডেটা স্ট্রিম তৈরি করতে।
Observable<Integer> observable = Observable.create(emitter -> { emitter.onNext(1); emitter.onNext(2); emitter.onComplete(); });range(): একটি নির্দিষ্ট সীমার মধ্যে সংখ্যা emit করে।
Observable<Integer> observable = Observable.range(1, 5);interval(): নির্দিষ্ট সময়ের বিরতিতে ডেটা emit করে (সাধারণত asynchronous কাজের জন্য ব্যবহৃত)।
Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS);
Observable-এর সুবিধা
- Asynchronous এবং Event-driven Programming সহজ করে।
- Functional এবং Declarative কোডিং স্টাইল।
- Operators-এর মাধ্যমে সহজ ডেটা প্রক্রিয়াকরণ।
- Thread Management-এর জন্য Schedulers ব্যবহার।
Observable এর সীমাবদ্ধতা
- শিখতে কিছুটা সময় লাগে।
- Debugging অনেক সময় জটিল হয়ে যায়।
- Subscription সঠিকভাবে dispose() না করলে মেমোরি লিক হতে পারে।
RxJava-তে Observable একটি গুরুত্বপূর্ণ ভূমিকা পালন করে এবং এটি asynchronous programming আরও কার্যকর ও সহজতর করে।
RxJava-তে Observer হলো একটি ইন্টারফেস, যা Observable-এর সাথে কাজ করে। এটি ডেটা স্ট্রিম পর্যবেক্ষণ করে এবং স্ট্রিম থেকে প্রাপ্ত ডেটা বা ইভেন্টগুলোতে প্রতিক্রিয়া জানায়।
Observer-এর ভূমিকা:
- ডেটা স্ট্রিম পর্যবেক্ষণ:
Observer Observable-এর তৈরি ডেটা স্ট্রিম পর্যবেক্ষণ করে এবং নতুন ডেটা বা ইভেন্ট প্রাপ্ত হলে তা গ্রহণ করে। - ইভেন্ট হ্যান্ডলিং:
Observer তিন ধরনের ইভেন্ট পরিচালনা করতে পারে:- onNext(): যখন নতুন ডেটা আসে।
- onError(): যখন কোনো ত্রুটি ঘটে।
- onComplete(): যখন ডেটা স্ট্রিম সম্পূর্ণ হয়।
- রেসপন্স তৈরি:
ডেটা বা ইভেন্টের উপর ভিত্তি করে Observer একটি নির্দিষ্ট অ্যাকশন বা রেসপন্স সম্পাদন করে।
Observer কাজের প্রক্রিয়া:
- Subscription:
Observer একটি Observable-এর সাথে সংযুক্ত হয় (Subscribe করে), এবং এর ডেটা স্ট্রিম বা ইভেন্টগুলো পর্যবেক্ষণ শুরু করে। - ইভেন্ট প্রাপ্তি:
Observable থেকে ডেটা বা ইভেন্ট আসার সাথে সাথে Observer প্রাসঙ্গিক ইভেন্ট মেথড (onNext,onError, বাonComplete) কল করে। - অ্যাকশন সম্পাদন:
প্রতিটি ইভেন্টে নির্ধারিত অ্যাকশন বা অপারেশন সম্পাদিত হয়।
উদাহরণ: Observer-এর কাজ
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
public class ObserverExample {
public static void main(String[] args) {
// Observable তৈরি
Observable<String> observable = Observable.just("Hello", "RxJava", "World");
// Observer তৈরি
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("Subscribed to Observable");
}
@Override
public void onNext(String s) {
System.out.println("Received: " + s);
}
@Override
public void onError(Throwable e) {
System.err.println("Error occurred: " + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("All data received. Completed!");
}
};
// Observable-এ Observer যুক্ত করা
observable.subscribe(observer);
}
}
আউটপুট:
Subscribed to Observable
Received: Hello
Received: RxJava
Received: World
All data received. Completed!
মেথডসমূহের বিস্তারিত:
- onSubscribe(Disposable d):
- Observable-এর সাথে Observer যুক্ত হলে প্রথমে এটি কল হয়।
- এখানে আপনি Subscription ম্যানেজ করতে পারেন (যদি প্রয়োজন হয় Subscription বন্ধ করা)।
- onNext(T t):
- Observable থেকে ডেটা প্রাপ্ত হলে এটি কল হয়।
- প্রত্যেকটি ডেটার জন্য আলাদা করে
onNextকল হয়।
- onError(Throwable e):
- Observable-এ কোনো ত্রুটি ঘটলে এটি কল হয়।
- একবার
onErrorকল হলে, আর কোনো ডেটা বা ইভেন্ট পাঠানো হয় না।
- onComplete():
- Observable থেকে সমস্ত ডেটা পাঠানোর পরে এটি কল হয়।
onCompleteকল হলে, এটি নিশ্চিত করে যে ডেটা স্ট্রিম শেষ হয়েছে।
Observer এবং Observable-এর ইন্টারঅ্যাকশন:
- Observable ডেটা ইমিট (emit) করে।
- Observer সেই ডেটা গ্রহণ করে এবং নির্ধারিত ইভেন্ট হ্যান্ডলারে সাড়া দেয়।
- Subscription-এর মাধ্যমে এই সম্পর্কটি সক্রিয় থাকে।
RxJava-তে Observer গুরুত্বপূর্ণ কারণ এটি অ্যাসিঙ্ক্রোনাস ইভেন্ট ড্রাইভেন প্রোগ্রামিং সহজ করে তোলে।
Cold Observables এবং Hot Observables হল RxJava-এর দুটি গুরুত্বপূর্ণ ধারণা, যা Observable এর আচরণের উপর ভিত্তি করে বিভক্ত। এদের প্রধান পার্থক্য হল ডেটা স্ট্রিম কিভাবে প্রেরিত হয় এবং সাবস্ক্রাইবারদের (Subscribers) সাথে কিভাবে আচরণ করে।
Cold Observable:
- ডেটা স্ট্রিম শুধুমাত্র সাবস্ক্রাইব করার পর শুরু হয়।
যখন কেউ Observable-এ সাবস্ক্রাইব করে, তখন Observable ডেটা এমিট করা শুরু করে। - প্রতিটি সাবস্ক্রাইবার নতুন ডেটা স্ট্রিম পায়।
- Cold Observable-এর ডেটা static থাকে, অর্থাৎ প্রত্যেক সাবস্ক্রাইবারের জন্য ডেটা পুনরায় এমিট হয়।
- উদাহরণ: API কল, ডেটাবেস কোয়েরি, বা ফাইল রিড অপারেশন।
উদাহরণ:
import io.reactivex.Observable;
public class ColdObservableExample {
public static void main(String[] args) {
// Create a Cold Observable
Observable<Integer> observable = Observable.just(1, 2, 3, 4);
// First Subscriber
observable.subscribe(item -> System.out.println("Subscriber 1: " + item));
// Second Subscriber
observable.subscribe(item -> System.out.println("Subscriber 2: " + item));
}
}
আউটপুট:
Subscriber 1: 1
Subscriber 1: 2
Subscriber 1: 3
Subscriber 1: 4
Subscriber 2: 1
Subscriber 2: 2
Subscriber 2: 3
Subscriber 2: 4
Hot Observable:
- ডেটা স্ট্রিম সাবস্ক্রাইবারদের জন্য অপেক্ষা না করে চলতে থাকে।
Observable ডেটা এমিট করতে শুরু করে, এমনকি যদি কোনো সাবস্ক্রাইবার না থাকে। - নতুন সাবস্ক্রাইবাররা তখন থেকেই ডেটা পেতে শুরু করে যখন তারা সাবস্ক্রাইব করে। আগে প্রেরিত ডেটা তারা পায় না।
- Hot Observable সাধারণত লাইভ ডেটা বা event-driven operations-এর জন্য ব্যবহৃত হয়।
- উদাহরণ: Mouse clicks, Keyboard events, বা WebSocket connections।
উদাহরণ:
import io.reactivex.Observable;
import io.reactivex.subjects.PublishSubject;
public class HotObservableExample {
public static void main(String[] args) {
// Create a Hot Observable using PublishSubject
PublishSubject<Integer> hotObservable = PublishSubject.create();
// First Subscriber subscribes
hotObservable.subscribe(item -> System.out.println("Subscriber 1: " + item));
// Emit some data
hotObservable.onNext(1);
hotObservable.onNext(2);
// Second Subscriber subscribes
hotObservable.subscribe(item -> System.out.println("Subscriber 2: " + item));
// Emit more data
hotObservable.onNext(3);
hotObservable.onNext(4);
}
}
আউটপুট:
Subscriber 1: 1
Subscriber 1: 2
Subscriber 1: 3
Subscriber 2: 3
Subscriber 1: 4
Subscriber 2: 4
Cold এবং Hot Observable-এর প্রধান পার্থক্য:
| বৈশিষ্ট্য | Cold Observable | Hot Observable |
|---|---|---|
| ডেটা স্ট্রিম শুরু | সাবস্ক্রিপশন-এর পরে শুরু হয়। | সাবস্ক্রিপশন-এর আগে থেকেই চলমান। |
| প্রত্যেক সাবস্ক্রাইবারের জন্য ডেটা | নতুন করে ডেটা এমিট হয়। | সাবস্ক্রাইব করার সময় থেকে ডেটা পায়। |
| ব্যবহারক্ষেত্র | স্ট্যাটিক বা ফিক্সড ডেটার জন্য। | লাইভ ডেটা স্ট্রিমের জন্য। |
| উদাহরণ | API calls, Database queries। | Mouse clicks, WebSocket data। |
Hybrid Observables:
কিছু Observable, যেমন ConnectableObservable, Hot এবং Cold এর মিশ্রণ হিসাবে কাজ করতে পারে। এগুলি .publish() এবং .connect() এর মাধ্যমে কনফিগার করা যায়।
উপসংহার:
Cold Observable ডেটা পুনরায় তৈরি করতে পারে এবং নির্ধারিত ডেটা এমিট করার জন্য কার্যকর, যেখানে Hot Observable বাস্তব-সময়ের ডেটা বা ইভেন্ট পরিচালনার জন্য আদর্শ।
RxJava-তে Observable এবং Observer হল দুইটি প্রধান component। নিচে উদাহরণসহ এই দুটি component কীভাবে তৈরি এবং ব্যবহার করা হয় তা দেখানো হলো।
1. Observable এবং Observer তৈরি:
Observable এমন একটি entity যা ডেটা ইমিট করে। Observer সেই ডেটা consume করে। Observable এবং Observer এর মধ্যে subscription এর মাধ্যমে ডেটা পাঠানো হয়।
উদাহরণ: Observable এবং Observer তৈরি
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
public class RxJavaExample {
public static void main(String[] args) {
// 1. Create an Observable
Observable<String> observable = Observable.create(emitter -> {
try {
emitter.onNext("Item 1"); // Emit first item
emitter.onNext("Item 2"); // Emit second item
emitter.onNext("Item 3"); // Emit third item
emitter.onComplete(); // Signal completion
} catch (Exception e) {
emitter.onError(e); // Signal an error if any
}
});
// 2. Create an Observer
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("Subscribed to Observable");
}
@Override
public void onNext(String item) {
System.out.println("Received: " + item);
}
@Override
public void onError(Throwable e) {
System.out.println("Error occurred: " + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("All items are received!");
}
};
// 3. Subscribe the Observer to the Observable
observable.subscribe(observer);
}
}
কোড বিশ্লেষণ:
Observable তৈরি:
Observable<String> observable = Observable.create(emitter -> { ... });create()মেথড ব্যবহার করে Observable তৈরি করা হয়েছে।onNext()মেথড ডেটা ইমিট করে।onComplete()ইভেন্ট শেষ হওয়ার সংকেত দেয়।onError()কোনো ত্রুটি হলে তা সংকেত দেয়।
Observer তৈরি:
Observer<String> observer = new Observer<String>() { ... };onSubscribe(): subscription শুরু হলে ডাকা হয়।onNext()ডেটা রিসিভ হলে ডাকা হয়।onError()ত্রুটি থাকলে ডাকা হয়।onComplete()সব ডেটা পাওয়া শেষ হলে ডাকা হয়।
Subscribe করা:
observable.subscribe(observer);- Observer কে Observable এর সাথে সংযুক্ত করা হয়েছে।
2. সংক্ষিপ্ত উদাহরণ (Lambda Expressions):
RxJava এর সাথে Lambda expressions ব্যবহার করে কোডকে আরো সহজ করা যায়।
import io.reactivex.Observable;
public class RxJavaLambdaExample {
public static void main(String[] args) {
// Create an Observable
Observable<String> observable = Observable.just("Hello", "RxJava", "World");
// Subscribe to the Observable using Lambda
observable.subscribe(
item -> System.out.println("Received: " + item), // onNext
error -> System.out.println("Error: " + error), // onError
() -> System.out.println("Completed!") // onComplete
);
}
}
Lambda কোড বিশ্লেষণ:
Observable.just()একটি Observable তৈরি করে যা ডেটা ইমিট করে।subscribe()এর ভিতরে তিনটি lambda function ব্যবহার করা হয়েছে:- প্রথমটি ডেটা (
onNext) প্রক্রিয়াকরণে, - দ্বিতীয়টি ত্রুটি (
onError) হ্যান্ডেল করতে, - তৃতীয়টি সম্পন্ন হলে (
onComplete) একটি সংকেত পাঠাতে।
- প্রথমটি ডেটা (
আউটপুট:
উদাহরণ 1:
Subscribed to Observable
Received: Item 1
Received: Item 2
Received: Item 3
All items are received!
উদাহরণ 2:
Received: Hello
Received: RxJava
Received: World
Completed!
লক্ষ্য:
- Observable এবং Observer এর কাজ asynchronous এবং event-driven হওয়ায় এটি বড় ডেটা বা real-time ইভেন্ট পরিচালনার জন্য উপযুক্ত।
- RxJava এর Lambda expressions কোডকে concise এবং সহজ করে তোলে।
Read more