subscribeOn() এবং observeOn() এর মাধ্যমে Threading কন্ট্রোল করা

Schedulers এবং Concurrency - আরএক্সজাভা (RxJava) - Java Technologies

252

RxJava-তে subscribeOn() এবং observeOn() অপারেটর ব্যবহার করে বিভিন্ন থ্রেডে কাজগুলি পরিচালনা করা যায়। এগুলো concurrency এবং threading এর কন্ট্রোল দিতে সাহায্য করে।


subscribeOn() অপারেটর

  • কোথায় কাজ করে:
    এটি নির্ধারণ করে যে Observable কোন থ্রেডে কাজ করবে বা ডেটা এমিট করবে।
  • ব্যবহার:
    subscribeOn() অপারেটরটি শুধুমাত্র একবার কাজ করে এবং ডেটা এমিশন প্রসেসে প্রথমে প্রয়োগ করা হয়।

উদাহরণ:

Observable<Integer> observable = Observable.create(emitter -> {
    System.out.println("Observable thread: " + Thread.currentThread().getName());
    emitter.onNext(1);
    emitter.onComplete();
});

observable
    .subscribeOn(Schedulers.io()) // Observable runs on IO thread
    .subscribe(item -> System.out.println("Received: " + item));

আউটপুট:

Observable thread: RxCachedThreadScheduler
Received: 1

observeOn() অপারেটর

  • কোথায় কাজ করে:
    এটি নির্ধারণ করে যে Observer বা সাবস্ক্রাইবার কোন থ্রেডে কাজ করবে।
  • ব্যবহার:
    observeOn() অপারেটরটি ডেটা প্রক্রিয়াকরণের থ্রেড পরিবর্তন করতে ব্যবহার করা হয়। এটি একাধিকবার প্রয়োগ করা যেতে পারে।

উদাহরণ:

Observable<Integer> observable = Observable.create(emitter -> {
    System.out.println("Observable thread: " + Thread.currentThread().getName());
    emitter.onNext(1);
    emitter.onComplete();
});

observable
    .subscribeOn(Schedulers.io())              // Observable runs on IO thread
    .observeOn(Schedulers.computation())       // Observer runs on Computation thread
    .subscribe(item -> System.out.println("Observer thread: " + Thread.currentThread().getName()));

আউটপুট:

Observable thread: RxCachedThreadScheduler
Observer thread: RxComputationThreadPool

subscribeOn() এবং observeOn() একসাথে ব্যবহার

RxJava-তে এই দুটি অপারেটর একত্রে ব্যবহার করলে, আপনি Observable এবং Observer এর থ্রেড আলাদাভাবে নির্ধারণ করতে পারেন।

উদাহরণ:

Observable<Integer> observable = Observable.create(emitter -> {
    System.out.println("Emitting on: " + Thread.currentThread().getName());
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onComplete();
});

observable
    .subscribeOn(Schedulers.io())               // Observable runs on IO thread
    .observeOn(Schedulers.computation())        // Observer runs on Computation thread
    .subscribe(
        item -> System.out.println("Processing on: " + Thread.currentThread().getName() + " - Received: " + item)
    );

আউটপুট:

Emitting on: RxCachedThreadScheduler
Processing on: RxComputationThreadPool - Received: 1
Processing on: RxComputationThreadPool - Received: 2

মাল্টিপল observeOn() ব্যবহার

observeOn() একাধিকবার ব্যবহার করলে, প্রতিটি পরবর্তী ধাপ আলাদা থ্রেডে চালানো যায়।

উদাহরণ:

Observable<Integer> observable = Observable.create(emitter -> {
    System.out.println("Emitting on: " + Thread.currentThread().getName());
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onComplete();
});

observable
    .subscribeOn(Schedulers.io())                // Emission happens on IO thread
    .observeOn(Schedulers.computation())         // First observer runs on Computation thread
    .map(item -> {
        System.out.println("Mapping on: " + Thread.currentThread().getName());
        return item * 2;
    })
    .observeOn(Schedulers.newThread())           // Second observer runs on a New thread
    .subscribe(item -> System.out.println("Final processing on: " + Thread.currentThread().getName() + " - Received: " + item));

আউটপুট:

Emitting on: RxCachedThreadScheduler
Mapping on: RxComputationThreadPool
Final processing on: RxNewThreadScheduler - Received: 2
Mapping on: RxComputationThreadPool
Final processing on: RxNewThreadScheduler - Received: 4

Schedulers সম্পর্কে সংক্ষিপ্ত বিবরণ:

RxJava-তে বিভিন্ন ধরনের Schedulers রয়েছে, যেগুলো থ্রেড ম্যানেজমেন্টে ব্যবহৃত হয়।

  1. Schedulers.io()
    • IO সম্পর্কিত কাজ (ডাটাবেজ, নেটওয়ার্ক কল) এর জন্য ব্যবহৃত হয়।
    • Cached thread pool ব্যবহার করে।
  2. Schedulers.computation()
    • CPU-ইনটেনসিভ কাজ (কম্পিউটেশন) এর জন্য ব্যবহৃত হয়।
    • Fixed thread pool ব্যবহার করে (CPU cores এর উপর ভিত্তি করে)।
  3. Schedulers.newThread()
    • প্রতিবার একটি নতুন থ্রেড তৈরি করে।
  4. AndroidSchedulers.mainThread() (শুধু Android-এর জন্য)
    • Main UI thread-এ কাজ পরিচালনা করার জন্য ব্যবহৃত হয়।

উপসংহার:

  • subscribeOn() অপারেটর ব্যবহার করে আপনি Observable এর থ্রেড নির্ধারণ করতে পারেন।
  • observeOn() অপারেটর Observer এর থ্রেড নিয়ন্ত্রণ করতে সাহায্য করে।
  • একসাথে ব্যবহার করে আপনি একাধিক থ্রেডে কাজের লজিক তৈরি করতে পারেন।

এটি asynchronous এবং reactive programming-এ থ্রেড ম্যানেজমেন্টকে অনেক সহজ এবং কার্যকর করে তোলে।

Content added By
Promotion

Are you sure to start over?

Loading...