RxJava-তে subscribeOn() এবং observeOn() অপারেটর ব্যবহার করে বিভিন্ন থ্রেডে কাজগুলি পরিচালনা করা যায়। এগুলো concurrency এবং threading এর কন্ট্রোল দিতে সাহায্য করে।
subscribeOn() অপারেটর
- কোথায় কাজ করে:
এটি নির্ধারণ করে যে Observable কোন থ্রেডে কাজ করবে বা ডেটা এমিট করবে। - ব্যবহার:
subscribeOn()অপারেটরটি শুধুমাত্র একবার কাজ করে এবং ডেটা এমিশন প্রসেসে প্রথমে প্রয়োগ করা হয়।
উদাহরণ:
Observable<Integer> observable = Observable.create(emitter -> {
System.out.println("Observable thread: " + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onComplete();
});
observable
.subscribeOn(Schedulers.io()) // Observable runs on IO thread
.subscribe(item -> System.out.println("Received: " + item));
আউটপুট:
Observable thread: RxCachedThreadScheduler
Received: 1
observeOn() অপারেটর
- কোথায় কাজ করে:
এটি নির্ধারণ করে যে Observer বা সাবস্ক্রাইবার কোন থ্রেডে কাজ করবে। - ব্যবহার:
observeOn()অপারেটরটি ডেটা প্রক্রিয়াকরণের থ্রেড পরিবর্তন করতে ব্যবহার করা হয়। এটি একাধিকবার প্রয়োগ করা যেতে পারে।
উদাহরণ:
Observable<Integer> observable = Observable.create(emitter -> {
System.out.println("Observable thread: " + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onComplete();
});
observable
.subscribeOn(Schedulers.io()) // Observable runs on IO thread
.observeOn(Schedulers.computation()) // Observer runs on Computation thread
.subscribe(item -> System.out.println("Observer thread: " + Thread.currentThread().getName()));
আউটপুট:
Observable thread: RxCachedThreadScheduler
Observer thread: RxComputationThreadPool
subscribeOn() এবং observeOn() একসাথে ব্যবহার
RxJava-তে এই দুটি অপারেটর একত্রে ব্যবহার করলে, আপনি Observable এবং Observer এর থ্রেড আলাদাভাবে নির্ধারণ করতে পারেন।
উদাহরণ:
Observable<Integer> observable = Observable.create(emitter -> {
System.out.println("Emitting on: " + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
});
observable
.subscribeOn(Schedulers.io()) // Observable runs on IO thread
.observeOn(Schedulers.computation()) // Observer runs on Computation thread
.subscribe(
item -> System.out.println("Processing on: " + Thread.currentThread().getName() + " - Received: " + item)
);
আউটপুট:
Emitting on: RxCachedThreadScheduler
Processing on: RxComputationThreadPool - Received: 1
Processing on: RxComputationThreadPool - Received: 2
মাল্টিপল observeOn() ব্যবহার
observeOn() একাধিকবার ব্যবহার করলে, প্রতিটি পরবর্তী ধাপ আলাদা থ্রেডে চালানো যায়।
উদাহরণ:
Observable<Integer> observable = Observable.create(emitter -> {
System.out.println("Emitting on: " + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
});
observable
.subscribeOn(Schedulers.io()) // Emission happens on IO thread
.observeOn(Schedulers.computation()) // First observer runs on Computation thread
.map(item -> {
System.out.println("Mapping on: " + Thread.currentThread().getName());
return item * 2;
})
.observeOn(Schedulers.newThread()) // Second observer runs on a New thread
.subscribe(item -> System.out.println("Final processing on: " + Thread.currentThread().getName() + " - Received: " + item));
আউটপুট:
Emitting on: RxCachedThreadScheduler
Mapping on: RxComputationThreadPool
Final processing on: RxNewThreadScheduler - Received: 2
Mapping on: RxComputationThreadPool
Final processing on: RxNewThreadScheduler - Received: 4
Schedulers সম্পর্কে সংক্ষিপ্ত বিবরণ:
RxJava-তে বিভিন্ন ধরনের Schedulers রয়েছে, যেগুলো থ্রেড ম্যানেজমেন্টে ব্যবহৃত হয়।
Schedulers.io()- IO সম্পর্কিত কাজ (ডাটাবেজ, নেটওয়ার্ক কল) এর জন্য ব্যবহৃত হয়।
- Cached thread pool ব্যবহার করে।
Schedulers.computation()- CPU-ইনটেনসিভ কাজ (কম্পিউটেশন) এর জন্য ব্যবহৃত হয়।
- Fixed thread pool ব্যবহার করে (CPU cores এর উপর ভিত্তি করে)।
Schedulers.newThread()- প্রতিবার একটি নতুন থ্রেড তৈরি করে।
AndroidSchedulers.mainThread()(শুধু Android-এর জন্য)- Main UI thread-এ কাজ পরিচালনা করার জন্য ব্যবহৃত হয়।
উপসংহার:
subscribeOn()অপারেটর ব্যবহার করে আপনি Observable এর থ্রেড নির্ধারণ করতে পারেন।observeOn()অপারেটর Observer এর থ্রেড নিয়ন্ত্রণ করতে সাহায্য করে।- একসাথে ব্যবহার করে আপনি একাধিক থ্রেডে কাজের লজিক তৈরি করতে পারেন।
এটি asynchronous এবং reactive programming-এ থ্রেড ম্যানেজমেন্টকে অনেক সহজ এবং কার্যকর করে তোলে।
Read more