RxJava-তে Schedulers এবং Concurrency একটি গুরুত্বপূর্ণ ভূমিকা পালন করে। এগুলি অ্যাসিনক্রোনাস প্রোগ্রামিং সহজ করে এবং ব্যাকগ্রাউন্ড থ্রেডে কাজ করতে সহায়তা করে, যা UI থ্রেড বা মেইন থ্রেডে ব্লকিং এড়ায়।
Schedulers:
Schedulers RxJava-এর একটি উপাদান, যা থ্রেড ব্যবস্থাপনার দায়িত্ব পালন করে। এটি নির্ধারণ করে Observable-এর কাজ কোন থ্রেডে চলবে এবং Observer কোন থ্রেডে সেই কাজ পর্যবেক্ষণ করবে।
Schedulers-এর প্রধান ধরনসমূহ:
- Schedulers.io()
- I/O (Input/Output) সম্পর্কিত কাজের জন্য।
- যেমন: API কল, ডাটাবেস অপারেশন, ফাইল পড়া/লেখা।
- এটি থ্রেড পুল ব্যবহার করে।
- Schedulers.computation()
- CPU-নির্ভর ভারী কাজের জন্য।
- যেমন: গাণিতিক গণনা, ডেটা প্রসেসিং।
- এটি CPU কোরের সংখ্যা অনুযায়ী থ্রেড তৈরি করে।
- Schedulers.newThread()
- প্রতিবার একটি নতুন থ্রেড তৈরি করে।
- খুব কম ব্যবহৃত হয়, কারণ এটি কার্যক্ষমতার দিক থেকে সেরা নয়।
- Schedulers.single()
- একটি সিঙ্গল থ্রেডে কাজ করে।
- সিকুয়েন্সিয়াল বা সিরিজ কাজের জন্য উপযুক্ত।
- AndroidSchedulers.mainThread() (Android-specific)
- UI সম্পর্কিত কাজ মেইন থ্রেডে চালানোর জন্য।
- শুধুমাত্র Android ডেভেলপমেন্টে ব্যবহৃত হয়।
subscribeOn() এবং observeOn():
- subscribeOn()
- নির্ধারণ করে Observable কোন থ্রেডে কাজ শুরু করবে।
- observeOn()
- নির্ধারণ করে Observer কোন থ্রেডে ডেটা পর্যবেক্ষণ এবং প্রসেসিং করবে।
উদাহরণ:
Observable.just("Hello", "RxJava")
.subscribeOn(Schedulers.io()) // Observable I/O থ্রেডে কাজ করবে
.observeOn(Schedulers.computation()) // Observer Computation থ্রেডে ডেটা প্রসেস করবে
.subscribe(item -> System.out.println("Received: " + item));
Concurrency এবং Thread Switching:
RxJava-তে Concurrency সহজেই ব্যবস্থাপনা করা যায় Schedulers ব্যবহার করে। আপনি যেকোনো সময় এক থ্রেড থেকে অন্য থ্রেডে কাজ সুইচ করতে পারবেন।
উদাহরণ: Multiple Thread Switching
Observable.fromArray(1, 2, 3, 4, 5)
.subscribeOn(Schedulers.io()) // ডেটা I/O থ্রেডে তৈরি হবে
.map(item -> {
System.out.println("Processing on: " + Thread.currentThread().getName());
return item * 2;
})
.observeOn(Schedulers.computation()) // Computation থ্রেডে ডেটা প্রসেস হবে
.subscribe(item -> {
System.out.println("Received on: " + Thread.currentThread().getName() + " -> " + item);
});
Schedulers ব্যবহারের সেরা অভ্যাস:
- Heavy Workload:
- ভারী কাজের জন্য
Schedulers.io()বাSchedulers.computation()ব্যবহার করুন।
- ভারী কাজের জন্য
- UI Update (Android):
- মেইন থ্রেডে কাজ করার জন্য
AndroidSchedulers.mainThread()ব্যবহার করুন।
- মেইন থ্রেডে কাজ করার জন্য
- Avoid Blocking:
- মেইন থ্রেডে ব্লকিং অপারেশন এড়িয়ে চলুন।
পুরো প্রক্রিয়ার উদাহরণ:
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class RxJavaSchedulerExample {
public static void main(String[] args) {
Observable<String> observable = Observable.create(emitter -> {
System.out.println("Observable thread: " + Thread.currentThread().getName());
emitter.onNext("Hello");
emitter.onNext("RxJava");
emitter.onComplete();
});
observable
.subscribeOn(Schedulers.io()) // Observable I/O থ্রেডে কাজ করবে
.observeOn(Schedulers.computation()) // Observer Computation থ্রেডে কাজ করবে
.subscribe(item -> {
System.out.println("Observer thread: " + Thread.currentThread().getName());
System.out.println("Received: " + item);
});
}
}
আউটপুট:
Observable thread: RxCachedThreadScheduler-1
Observer thread: RxComputationThreadPool-1
Received: Hello
Observer thread: RxComputationThreadPool-1
Received: RxJava
Schedulers ব্যবহার না করলে কী হয়?
- ডিফল্টভাবে, Observable এবং Observer উভয়েই বর্তমান থ্রেড (main thread) ব্যবহার করে কাজ করবে।
- এটি Android UI বা ব্যাকগ্রাউন্ড অ্যাসিনক্রোনাস কাজের জন্য প্রায়শই যথেষ্ট নয়।
উপসংহার:
- Schedulers-এর সাহায্যে আপনি সহজেই ডেটা স্ট্রিম বিভিন্ন থ্রেডে পরিচালনা করতে পারবেন।
- Concurrency ব্যবস্থাপনার জন্য এটি একটি কার্যকরী টুল।
- subscribeOn() এবং observeOn() ব্যবহার করে Observable এবং Observer-এর থ্রেড নির্ধারণ করা যায়।
RxJava-তে Schedulers-এর সঠিক ব্যবহার অ্যাপ্লিকেশনের কার্যক্ষমতা এবং স্থিতিশীলতা বাড়াতে সাহায্য করে।
RxJava-তে Schedulers হলো thread management এর জন্য ব্যবহৃত একটি প্রক্রিয়া। এটি বিভিন্ন thread-এ ডেটা প্রসেসিং ও কাজ চালানোর জন্য ব্যবহার করা হয়।
Reactive Programming-এ কাজগুলো asynchronous বা concurrent হওয়ায় সঠিক thread নির্বাচন করা অত্যন্ত গুরুত্বপূর্ণ। Schedulers আমাদের এই সুবিধা দেয় যাতে আমরা কাজগুলো সহজে background thread, computation thread, অথবা main thread-এ পরিচালনা করতে পারি।
Schedulers কেন ব্যবহার করা হয়?
Schedulers-এর প্রধান কাজ হলো:
- Thread Management সহজ করা:
সঠিক thread-এ কাজ নির্ধারণ করা, যেমন UI কাজ main thread-এ এবং ব্যাকগ্রাউন্ড কাজ background thread-এ। - Asynchronous কাজ সহজ করা:
RxJava-তে Observable এবং Observer এর কাজগুলো বিভিন্ন thread-এ চালাতে। - Performance বাড়ানো:
Background threads-এ computational বা I/O-intensive কাজগুলো পরিচালনা করে মূল (main) thread-এর উপর চাপ কমানো। - Thread Switching সহজ করা:
সহজেই ডেটা স্ট্রিম একাধিক thread-এ পরিচালনা করা। উদাহরণস্বরূপ, I/O কাজ background thread-এ এবং UI update main thread-এ।
RxJava-তে Schedulers এর ধরণ:
RxJava বিভিন্ন ধরণের Schedulers প্রদান করে, প্রতিটি ভিন্ন ভিন্ন কাজের জন্য উপযোগী:
- Schedulers.io()
- I/O operations-এর জন্য ব্যবহৃত হয়, যেমন network calls, file operations, database queries।
- এটি একটি thread pool ব্যবহার করে যা অনেকগুলো I/O কাজ পরিচালনা করতে পারে।
- Example: Retrofit API calls।
- Schedulers.computation()
- Computationally heavy কাজের জন্য ব্যবহৃত হয়, যেমন mathematical calculations, data processing।
- এটি CPU cores-এর উপর ভিত্তি করে threads তৈরি করে।
- Schedulers.newThread()
- প্রতিবার একটি নতুন thread তৈরি করে।
- এটি খুব বেশি ব্যবহৃত হয় না কারণ এটি resources-intensive।
- Schedulers.single()
- একটি single thread-এ কাজ চালানোর জন্য ব্যবহৃত হয়।
- এটি sequential এবং ordered কাজের জন্য উপযোগী।
- AndroidSchedulers.mainThread()
- Android UI updates-এর জন্য ব্যবহৃত হয়।
- এটি শুধুমাত্র Android ডেভেলপমেন্টে ব্যবহৃত হয়।
- Schedulers.trampoline()
- একই thread-এ কাজগুলো sequentially চালানোর জন্য ব্যবহৃত হয়।
- এটি recursive কাজের জন্য উপযুক্ত।
Schedulers ব্যবহার করার উদাহরণ:
1. Schedulers.io() Example:
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class IoSchedulerExample {
public static void main(String[] args) {
Observable.fromCallable(() -> {
// Simulating a long-running I/O operation
Thread.sleep(1000);
return "I/O Operation Completed!";
})
.subscribeOn(Schedulers.io()) // I/O কাজ background thread-এ
.subscribe(result -> System.out.println(result + " on thread " + Thread.currentThread().getName()));
}
}
আউটপুট:
I/O Operation Completed! on thread RxCachedThreadScheduler-1
2. Schedulers.computation() Example:
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class ComputationSchedulerExample {
public static void main(String[] args) {
Observable.range(1, 5)
.map(number -> number * number) // Computational কাজ
.subscribeOn(Schedulers.computation()) // Computation thread-এ চালানো
.subscribe(result -> System.out.println(result + " on thread " + Thread.currentThread().getName()));
}
}
আউটপুট:
1 on thread RxComputationThreadPool-1
4 on thread RxComputationThreadPool-1
9 on thread RxComputationThreadPool-1
16 on thread RxComputationThreadPool-1
25 on thread RxComputationThreadPool-1
3. AndroidSchedulers.mainThread() Example (Android):
Observable.just("Hello, RxJava!")
.subscribeOn(Schedulers.io()) // Background thread
.observeOn(AndroidSchedulers.mainThread()) // UI update main thread-এ
.subscribe(result -> textView.setText(result));
subscribeOn() এবং observeOn() এর ভূমিকা
- subscribeOn(Scheduler):
- এটি নির্ধারণ করে যে Observable কোন thread-এ কাজ শুরু করবে।
- একাধিকবার ব্যবহার করলে শুধুমাত্র প্রথমটি কার্যকর হবে।
- observeOn(Scheduler):
- এটি নির্ধারণ করে Observer বা Subscriber কোন thread-এ কাজ করবে।
- একাধিকবার ব্যবহার করলে প্রতিবারই কার্যকর হবে।
Schedulers-এর সুবিধা
- Concurrency সহজ করে।
- Thread Switching-এর মাধ্যমে কাজ পরিচালনা সহজ।
- Performance বৃদ্ধি পায়।
- UI Thread-এ ব্লকিং কাজ এড়ানো যায়।
Schedulers ব্যবহার করার সেরা অনুশীলন (Best Practices):
- ব্যাকগ্রাউন্ড কাজের জন্য Schedulers.io() বা Schedulers.computation() ব্যবহার করুন।
- UI আপডেটের জন্য সর্বদা AndroidSchedulers.mainThread() ব্যবহার করুন।
- dispose() ব্যবহার করে subscription বন্ধ করুন, যাতে memory leaks এড়ানো যায়।
- বড় বা দীর্ঘ-running কাজগুলোর জন্য নতুন thread তৈরি না করে thread pools ব্যবহার করুন।
RxJava-তে Schedulers সঠিকভাবে ব্যবহার করলে আপনার প্রোগ্রামের performance এবং efficiency অনেক বৃদ্ধি পাবে।
RxJava-তে Schedulers হলো থ্রেড ম্যানেজমেন্টের জন্য ব্যবহৃত একটি উপাদান। এটি নির্ধারণ করে যে কোন থ্রেডে কাজটি সম্পন্ন হবে।
RxJava-তে সাধারণত তিনটি গুরুত্বপূর্ণ Scheduler রয়েছে:
- IO Scheduler
- Computation Scheduler
- New Thread Scheduler
এগুলো বিভিন্ন ধরণের অ্যাসিঙ্ক্রোনাস কাজ করার জন্য ব্যবহৃত হয়।
1. IO Scheduler
- উপযুক্ত কাজ:
এটি I/O সম্পর্কিত কাজ যেমন ডেটাবেস কল, REST API রিকোয়েস্ট, ফাইল অপারেশন, বা নেটওয়ার্ক রিকোয়েস্ট-এর জন্য উপযুক্ত। - বিশেষ বৈশিষ্ট্য:
- এটি একটি thread pool ব্যবহার করে।
- প্রতিটি কাজের জন্য আলাদা থ্রেড বরাদ্দ করে।
- থ্রেড পুনঃব্যবহার করতে সক্ষম।
- ব্যবহার:
Schedulers.io()।
উদাহরণ:
Observable.fromCallable(() -> {
// কোনো I/O কাজ (যেমন ফাইল পড়া বা API কল)
Thread.sleep(1000); // Simulating delay
return "Data fetched from API";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(System.out::println);
2. Computation Scheduler
- উপযুক্ত কাজ:
CPU-ইনটেনসিভ কাজ যেমন ডেটা প্রসেসিং, অ্যালগরিদম চালনা, বা হিসাব-সংক্রান্ত কাজের জন্য। - বিশেষ বৈশিষ্ট্য:
- এটি একটি fixed-size thread pool ব্যবহার করে।
- থ্রেড সংখ্যা ডিভাইসের CPU কোরের সমান।
- এটি I/O সম্পর্কিত কাজের জন্য নয়।
- ব্যবহার:
Schedulers.computation()।
উদাহরণ:
Observable.range(1, 10)
.map(i -> {
System.out.println("Processing on: " + Thread.currentThread().getName());
return i * 2; // Computational task
})
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.single())
.subscribe(result -> System.out.println("Received: " + result));
3. New Thread Scheduler
- উপযুক্ত কাজ:
যখন প্রতিবার একটি নতুন থ্রেড তৈরি করা প্রয়োজন, যেমন স্বতন্ত্র কাজ যেখানে থ্রেড পুনঃব্যবহার করা ঠিক নয়। - বিশেষ বৈশিষ্ট্য:
- এটি প্রতিটি কাজের জন্য একটি নতুন থ্রেড তৈরি করে।
- এটি অনেক বেশি থ্রেড তৈরি করে, যা অতিরিক্ত ব্যয়বহুল হতে পারে।
- ব্যবহার:
Schedulers.newThread()।
উদাহরণ:
Observable.create(emitter -> {
emitter.onNext("Task started");
Thread.sleep(500);
emitter.onNext("Task completed");
emitter.onComplete();
})
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.single())
.subscribe(System.out::println);
Schedulers-এর সংমিশ্রণ
RxJava-তে আপনি সাধারণত দুটি Scheduler ব্যবহার করেন:
- subscribeOn(Scheduler):
কোন Scheduler-এ Observable কাজ করবে তা নির্ধারণ করে। - observeOn(Scheduler):
কোন Scheduler-এ Observer কাজ করবে তা নির্ধারণ করে।
উদাহরণ:
Observable.just("Task")
.subscribeOn(Schedulers.io()) // কাজ I/O থ্রেডে সম্পন্ন হবে
.observeOn(Schedulers.computation()) // রেজাল্ট computation থ্রেডে প্রসেস হবে
.subscribe(result -> System.out.println("Received on: " + Thread.currentThread().getName()));
তুলনামূলক বিশ্লেষণ
| Scheduler | ব্যবহারযোগ্যতা | থ্রেড ব্যবস্থাপনা | উদাহরণ কাজ |
|---|---|---|---|
| IO Scheduler | I/O সম্পর্কিত কাজ | থ্রেড পুল, থ্রেড পুনঃব্যবহার | REST API, ফাইল পড়া বা লেখা |
| Computation Scheduler | CPU-ইনটেনসিভ কাজ | Fixed-size (CPU cores অনুযায়ী) | ডেটা প্রসেসিং, অ্যালগরিদম চালনা |
| New Thread Scheduler | স্বতন্ত্র নতুন থ্রেড প্রয়োজন | প্রতিবার নতুন থ্রেড তৈরি | নির্দিষ্ট স্বতন্ত্র কাজ |
প্রধান বিষয়:
- IO Scheduler: অধিকাংশ অ্যাসিঙ্ক্রোনাস কাজের জন্য আদর্শ।
- Computation Scheduler: CPU-নির্ভর কাজের জন্য সেরা।
- New Thread Scheduler: বিশেষ পরিস্থিতিতে স্বতন্ত্র নতুন থ্রেড প্রয়োজন হলে ব্যবহার করুন।
Schedulers নির্বাচন করার সময় কাজের ধরন এবং পারফরম্যান্স প্রয়োজনীয়তা বিবেচনা করুন।
RxJava-তে subscribeOn() এবং observeOn() অপারেটর ব্যবহার করে বিভিন্ন থ্রেডে কাজগুলি পরিচালনা করা যায়। এগুলো concurrency এবং threading এর কন্ট্রোল দিতে সাহায্য করে।
subscribeOn() অপারেটর
- কোথায় কাজ করে:
এটি নির্ধারণ করে যে Observable কোন থ্রেডে কাজ করবে বা ডেটা এমিট করবে। - ব্যবহার:
subscribeOn()অপারেটরটি শুধুমাত্র একবার কাজ করে এবং ডেটা এমিশন প্রসেসে প্রথমে প্রয়োগ করা হয়।
উদাহরণ:
Observable<Integer> observable = Observable.create(emitter -> {
System.out.println("Observable thread: " + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onComplete();
});
observable
.subscribeOn(Schedulers.io()) // Observable runs on IO thread
.subscribe(item -> System.out.println("Received: " + item));
আউটপুট:
Observable thread: RxCachedThreadScheduler
Received: 1
observeOn() অপারেটর
- কোথায় কাজ করে:
এটি নির্ধারণ করে যে Observer বা সাবস্ক্রাইবার কোন থ্রেডে কাজ করবে। - ব্যবহার:
observeOn()অপারেটরটি ডেটা প্রক্রিয়াকরণের থ্রেড পরিবর্তন করতে ব্যবহার করা হয়। এটি একাধিকবার প্রয়োগ করা যেতে পারে।
উদাহরণ:
Observable<Integer> observable = Observable.create(emitter -> {
System.out.println("Observable thread: " + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onComplete();
});
observable
.subscribeOn(Schedulers.io()) // Observable runs on IO thread
.observeOn(Schedulers.computation()) // Observer runs on Computation thread
.subscribe(item -> System.out.println("Observer thread: " + Thread.currentThread().getName()));
আউটপুট:
Observable thread: RxCachedThreadScheduler
Observer thread: RxComputationThreadPool
subscribeOn() এবং observeOn() একসাথে ব্যবহার
RxJava-তে এই দুটি অপারেটর একত্রে ব্যবহার করলে, আপনি Observable এবং Observer এর থ্রেড আলাদাভাবে নির্ধারণ করতে পারেন।
উদাহরণ:
Observable<Integer> observable = Observable.create(emitter -> {
System.out.println("Emitting on: " + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
});
observable
.subscribeOn(Schedulers.io()) // Observable runs on IO thread
.observeOn(Schedulers.computation()) // Observer runs on Computation thread
.subscribe(
item -> System.out.println("Processing on: " + Thread.currentThread().getName() + " - Received: " + item)
);
আউটপুট:
Emitting on: RxCachedThreadScheduler
Processing on: RxComputationThreadPool - Received: 1
Processing on: RxComputationThreadPool - Received: 2
মাল্টিপল observeOn() ব্যবহার
observeOn() একাধিকবার ব্যবহার করলে, প্রতিটি পরবর্তী ধাপ আলাদা থ্রেডে চালানো যায়।
উদাহরণ:
Observable<Integer> observable = Observable.create(emitter -> {
System.out.println("Emitting on: " + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
});
observable
.subscribeOn(Schedulers.io()) // Emission happens on IO thread
.observeOn(Schedulers.computation()) // First observer runs on Computation thread
.map(item -> {
System.out.println("Mapping on: " + Thread.currentThread().getName());
return item * 2;
})
.observeOn(Schedulers.newThread()) // Second observer runs on a New thread
.subscribe(item -> System.out.println("Final processing on: " + Thread.currentThread().getName() + " - Received: " + item));
আউটপুট:
Emitting on: RxCachedThreadScheduler
Mapping on: RxComputationThreadPool
Final processing on: RxNewThreadScheduler - Received: 2
Mapping on: RxComputationThreadPool
Final processing on: RxNewThreadScheduler - Received: 4
Schedulers সম্পর্কে সংক্ষিপ্ত বিবরণ:
RxJava-তে বিভিন্ন ধরনের Schedulers রয়েছে, যেগুলো থ্রেড ম্যানেজমেন্টে ব্যবহৃত হয়।
Schedulers.io()- IO সম্পর্কিত কাজ (ডাটাবেজ, নেটওয়ার্ক কল) এর জন্য ব্যবহৃত হয়।
- Cached thread pool ব্যবহার করে।
Schedulers.computation()- CPU-ইনটেনসিভ কাজ (কম্পিউটেশন) এর জন্য ব্যবহৃত হয়।
- Fixed thread pool ব্যবহার করে (CPU cores এর উপর ভিত্তি করে)।
Schedulers.newThread()- প্রতিবার একটি নতুন থ্রেড তৈরি করে।
AndroidSchedulers.mainThread()(শুধু Android-এর জন্য)- Main UI thread-এ কাজ পরিচালনা করার জন্য ব্যবহৃত হয়।
উপসংহার:
subscribeOn()অপারেটর ব্যবহার করে আপনি Observable এর থ্রেড নির্ধারণ করতে পারেন।observeOn()অপারেটর Observer এর থ্রেড নিয়ন্ত্রণ করতে সাহায্য করে।- একসাথে ব্যবহার করে আপনি একাধিক থ্রেডে কাজের লজিক তৈরি করতে পারেন।
এটি asynchronous এবং reactive programming-এ থ্রেড ম্যানেজমেন্টকে অনেক সহজ এবং কার্যকর করে তোলে।
RxJava-তে Schedulers এবং Concurrency ব্যবহার করে asynchronous এবং parallel কাজ সহজে পরিচালনা করা যায়। Schedulers মূলত RxJava-কে বলে দেয় কোন থ্রেডে কাজটি সম্পন্ন করতে হবে। এটি ব্যাকগ্রাউন্ড থ্রেডে computational বা I/O কাজ পরিচালনা এবং UI থ্রেডে রেজাল্ট আপডেট করতে ব্যবহৃত হয়।
Schedulers এর ধরন
RxJava বিভিন্ন ধরনের Schedulers সরবরাহ করে, যেমন:
- Schedulers.io():
- I/O অপারেশন (e.g., নেটওয়ার্ক কল, ফাইল অপারেশন) পরিচালনার জন্য ব্যবহৃত হয়।
- এটি থ্রেড পুল ব্যবহার করে।
- Schedulers.computation():
- Computational কাজ (e.g., ডেটা প্রসেসিং, algorithm) পরিচালনার জন্য ব্যবহৃত হয়।
- এটি CPU-এর core অনুযায়ী থ্রেড তৈরি করে।
- Schedulers.newThread():
- প্রতিবার একটি নতুন থ্রেড তৈরি করে।
- Schedulers.single():
- একটি একক থ্রেডে কাজ সম্পন্ন করে।
- AndroidSchedulers.mainThread() (Android এ ব্যবহৃত):
- UI থ্রেডে কাজ সম্পন্ন করে।
উদাহরণ 1: Schedulers ব্যবহার
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class RxJavaSchedulersExample {
public static void main(String[] args) throws InterruptedException {
// Create an Observable
Observable<String> observable = Observable.create(emitter -> {
System.out.println("Observable is emitting on: " + Thread.currentThread().getName());
emitter.onNext("Hello");
emitter.onNext("RxJava");
emitter.onComplete();
});
// Subscribe with Schedulers
observable
.subscribeOn(Schedulers.io()) // Specify the thread for Observable
.observeOn(Schedulers.computation()) // Specify the thread for Observer
.subscribe(
item -> System.out.println("Received: " + item + " on " + Thread.currentThread().getName()),
error -> System.err.println("Error: " + error),
() -> System.out.println("Completed on " + Thread.currentThread().getName())
);
// Sleep to allow asynchronous operation to complete
Thread.sleep(1000);
}
}
কোড বিশ্লেষণ:
- subscribeOn(Schedulers.io()):
- Observable-এ কাজ
ioথ্রেডে সম্পন্ন হবে।
- Observable-এ কাজ
- observeOn(Schedulers.computation()):
- Observer-এ কাজ
computationথ্রেডে সম্পন্ন হবে।
- Observer-এ কাজ
- Thread.sleep(1000):
- Asynchronous কাজ শেষ হওয়ার জন্য মেইন থ্রেডকে অপেক্ষা করতে দেয়া হয়েছে।
আউটপুট:
Observable is emitting on: RxCachedThreadScheduler-1
Received: Hello on RxComputationThreadPool-1
Received: RxJava on RxComputationThreadPool-1
Completed on RxComputationThreadPool-1
উদাহরণ 2: Schedulers ব্যবহার করে Parallel Execution
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class RxJavaParallelExample {
public static void main(String[] args) throws InterruptedException {
// Observable emitting multiple items
Observable.range(1, 5)
.flatMap(number ->
Observable.just(number)
.subscribeOn(Schedulers.computation()) // Parallel execution
.map(item -> {
System.out.println("Processing item " + item + " on " + Thread.currentThread().getName());
return item * 2; // Simulate processing
})
)
.observeOn(Schedulers.single()) // Collect results on a single thread
.subscribe(
result -> System.out.println("Result: " + result + " on " + Thread.currentThread().getName()),
error -> System.err.println("Error: " + error),
() -> System.out.println("All items processed!")
);
// Sleep to allow asynchronous operation to complete
Thread.sleep(2000);
}
}
কোড বিশ্লেষণ:
- flatMap():
- প্রতিটি
numberআলাদা আলাদা থ্রেডে parallelভাবে প্রক্রিয়াকরণ করা হয়।
- প্রতিটি
- Schedulers.computation():
- Computational কাজ parallelভাবে সম্পন্ন করতে ব্যবহৃত হয়েছে।
- Schedulers.single():
- Observer এর কাজ একটি single থ্রেডে সমাপ্ত হয়।
আউটপুট:
Processing item 1 on RxComputationThreadPool-1
Processing item 2 on RxComputationThreadPool-2
Processing item 3 on RxComputationThreadPool-3
Processing item 4 on RxComputationThreadPool-4
Processing item 5 on RxComputationThreadPool-5
Result: 2 on RxSingleScheduler-1
Result: 4 on RxSingleScheduler-1
Result: 6 on RxSingleScheduler-1
Result: 8 on RxSingleScheduler-1
Result: 10 on RxSingleScheduler-1
All items processed!
Schedulers এর ব্যবহার ক্ষেত্র:
- Background Tasks:
- Heavy computational কাজ ব্যাকগ্রাউন্ডে সম্পন্ন করতে
Schedulers.computation()।
- Heavy computational কাজ ব্যাকগ্রাউন্ডে সম্পন্ন করতে
- Network Calls:
- API কল বা ডেটাবেস অপারেশন পরিচালনায়
Schedulers.io()।
- API কল বা ডেটাবেস অপারেশন পরিচালনায়
- UI Updates:
- Android-এ UI থ্রেডে রেজাল্ট আপডেট করতে
AndroidSchedulers.mainThread()।
- Android-এ UI থ্রেডে রেজাল্ট আপডেট করতে
RxJava-র Schedulers asynchronous এবং parallel কাজগুলো সহজ ও কার্যকরভাবে পরিচালনার জন্য অপরিহার্য। উপরের উদাহরণগুলো ব্যবহার করে বিভিন্ন কাজের জন্য সঠিক Schedulers নির্বাচন করা শিখুন।
Read more