RxJava-তে Schedulers এবং Concurrency ব্যবহার করে asynchronous এবং parallel কাজ সহজে পরিচালনা করা যায়। Schedulers মূলত RxJava-কে বলে দেয় কোন থ্রেডে কাজটি সম্পন্ন করতে হবে। এটি ব্যাকগ্রাউন্ড থ্রেডে computational বা I/O কাজ পরিচালনা এবং UI থ্রেডে রেজাল্ট আপডেট করতে ব্যবহৃত হয়।
Schedulers এর ধরন
RxJava বিভিন্ন ধরনের Schedulers সরবরাহ করে, যেমন:
- Schedulers.io():
- I/O অপারেশন (e.g., নেটওয়ার্ক কল, ফাইল অপারেশন) পরিচালনার জন্য ব্যবহৃত হয়।
- এটি থ্রেড পুল ব্যবহার করে।
- Schedulers.computation():
- Computational কাজ (e.g., ডেটা প্রসেসিং, algorithm) পরিচালনার জন্য ব্যবহৃত হয়।
- এটি CPU-এর core অনুযায়ী থ্রেড তৈরি করে।
- Schedulers.newThread():
- প্রতিবার একটি নতুন থ্রেড তৈরি করে।
- Schedulers.single():
- একটি একক থ্রেডে কাজ সম্পন্ন করে।
- AndroidSchedulers.mainThread() (Android এ ব্যবহৃত):
- UI থ্রেডে কাজ সম্পন্ন করে।
উদাহরণ 1: Schedulers ব্যবহার
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class RxJavaSchedulersExample {
public static void main(String[] args) throws InterruptedException {
// Create an Observable
Observable<String> observable = Observable.create(emitter -> {
System.out.println("Observable is emitting on: " + Thread.currentThread().getName());
emitter.onNext("Hello");
emitter.onNext("RxJava");
emitter.onComplete();
});
// Subscribe with Schedulers
observable
.subscribeOn(Schedulers.io()) // Specify the thread for Observable
.observeOn(Schedulers.computation()) // Specify the thread for Observer
.subscribe(
item -> System.out.println("Received: " + item + " on " + Thread.currentThread().getName()),
error -> System.err.println("Error: " + error),
() -> System.out.println("Completed on " + Thread.currentThread().getName())
);
// Sleep to allow asynchronous operation to complete
Thread.sleep(1000);
}
}
কোড বিশ্লেষণ:
- subscribeOn(Schedulers.io()):
- Observable-এ কাজ
ioথ্রেডে সম্পন্ন হবে।
- Observable-এ কাজ
- observeOn(Schedulers.computation()):
- Observer-এ কাজ
computationথ্রেডে সম্পন্ন হবে।
- Observer-এ কাজ
- Thread.sleep(1000):
- Asynchronous কাজ শেষ হওয়ার জন্য মেইন থ্রেডকে অপেক্ষা করতে দেয়া হয়েছে।
আউটপুট:
Observable is emitting on: RxCachedThreadScheduler-1
Received: Hello on RxComputationThreadPool-1
Received: RxJava on RxComputationThreadPool-1
Completed on RxComputationThreadPool-1
উদাহরণ 2: Schedulers ব্যবহার করে Parallel Execution
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class RxJavaParallelExample {
public static void main(String[] args) throws InterruptedException {
// Observable emitting multiple items
Observable.range(1, 5)
.flatMap(number ->
Observable.just(number)
.subscribeOn(Schedulers.computation()) // Parallel execution
.map(item -> {
System.out.println("Processing item " + item + " on " + Thread.currentThread().getName());
return item * 2; // Simulate processing
})
)
.observeOn(Schedulers.single()) // Collect results on a single thread
.subscribe(
result -> System.out.println("Result: " + result + " on " + Thread.currentThread().getName()),
error -> System.err.println("Error: " + error),
() -> System.out.println("All items processed!")
);
// Sleep to allow asynchronous operation to complete
Thread.sleep(2000);
}
}
কোড বিশ্লেষণ:
- flatMap():
- প্রতিটি
numberআলাদা আলাদা থ্রেডে parallelভাবে প্রক্রিয়াকরণ করা হয়।
- প্রতিটি
- Schedulers.computation():
- Computational কাজ parallelভাবে সম্পন্ন করতে ব্যবহৃত হয়েছে।
- Schedulers.single():
- Observer এর কাজ একটি single থ্রেডে সমাপ্ত হয়।
আউটপুট:
Processing item 1 on RxComputationThreadPool-1
Processing item 2 on RxComputationThreadPool-2
Processing item 3 on RxComputationThreadPool-3
Processing item 4 on RxComputationThreadPool-4
Processing item 5 on RxComputationThreadPool-5
Result: 2 on RxSingleScheduler-1
Result: 4 on RxSingleScheduler-1
Result: 6 on RxSingleScheduler-1
Result: 8 on RxSingleScheduler-1
Result: 10 on RxSingleScheduler-1
All items processed!
Schedulers এর ব্যবহার ক্ষেত্র:
- Background Tasks:
- Heavy computational কাজ ব্যাকগ্রাউন্ডে সম্পন্ন করতে
Schedulers.computation()।
- Heavy computational কাজ ব্যাকগ্রাউন্ডে সম্পন্ন করতে
- Network Calls:
- API কল বা ডেটাবেস অপারেশন পরিচালনায়
Schedulers.io()।
- API কল বা ডেটাবেস অপারেশন পরিচালনায়
- UI Updates:
- Android-এ UI থ্রেডে রেজাল্ট আপডেট করতে
AndroidSchedulers.mainThread()।
- Android-এ UI থ্রেডে রেজাল্ট আপডেট করতে
RxJava-র Schedulers asynchronous এবং parallel কাজগুলো সহজ ও কার্যকরভাবে পরিচালনার জন্য অপরিহার্য। উপরের উদাহরণগুলো ব্যবহার করে বিভিন্ন কাজের জন্য সঠিক Schedulers নির্বাচন করা শিখুন।
Content added By
Read more