Schedulers এবং Concurrency

আরএক্সজাভা (RxJava) - Java Technologies

307

RxJava-তে Schedulers এবং Concurrency একটি গুরুত্বপূর্ণ ভূমিকা পালন করে। এগুলি অ্যাসিনক্রোনাস প্রোগ্রামিং সহজ করে এবং ব্যাকগ্রাউন্ড থ্রেডে কাজ করতে সহায়তা করে, যা UI থ্রেড বা মেইন থ্রেডে ব্লকিং এড়ায়।


Schedulers:

Schedulers RxJava-এর একটি উপাদান, যা থ্রেড ব্যবস্থাপনার দায়িত্ব পালন করে। এটি নির্ধারণ করে Observable-এর কাজ কোন থ্রেডে চলবে এবং Observer কোন থ্রেডে সেই কাজ পর্যবেক্ষণ করবে।

Schedulers-এর প্রধান ধরনসমূহ:

  1. Schedulers.io()
    • I/O (Input/Output) সম্পর্কিত কাজের জন্য।
    • যেমন: API কল, ডাটাবেস অপারেশন, ফাইল পড়া/লেখা।
    • এটি থ্রেড পুল ব্যবহার করে।
  2. Schedulers.computation()
    • CPU-নির্ভর ভারী কাজের জন্য।
    • যেমন: গাণিতিক গণনা, ডেটা প্রসেসিং।
    • এটি CPU কোরের সংখ্যা অনুযায়ী থ্রেড তৈরি করে।
  3. Schedulers.newThread()
    • প্রতিবার একটি নতুন থ্রেড তৈরি করে।
    • খুব কম ব্যবহৃত হয়, কারণ এটি কার্যক্ষমতার দিক থেকে সেরা নয়।
  4. Schedulers.single()
    • একটি সিঙ্গল থ্রেডে কাজ করে।
    • সিকুয়েন্সিয়াল বা সিরিজ কাজের জন্য উপযুক্ত।
  5. AndroidSchedulers.mainThread() (Android-specific)
    • UI সম্পর্কিত কাজ মেইন থ্রেডে চালানোর জন্য।
    • শুধুমাত্র Android ডেভেলপমেন্টে ব্যবহৃত হয়।

subscribeOn() এবং observeOn():

  1. subscribeOn()
    • নির্ধারণ করে Observable কোন থ্রেডে কাজ শুরু করবে।
  2. observeOn()
    • নির্ধারণ করে Observer কোন থ্রেডে ডেটা পর্যবেক্ষণ এবং প্রসেসিং করবে।

উদাহরণ:

Observable.just("Hello", "RxJava")
    .subscribeOn(Schedulers.io())          // Observable I/O থ্রেডে কাজ করবে
    .observeOn(Schedulers.computation())  // Observer Computation থ্রেডে ডেটা প্রসেস করবে
    .subscribe(item -> System.out.println("Received: " + item));

Concurrency এবং Thread Switching:

RxJava-তে Concurrency সহজেই ব্যবস্থাপনা করা যায় Schedulers ব্যবহার করে। আপনি যেকোনো সময় এক থ্রেড থেকে অন্য থ্রেডে কাজ সুইচ করতে পারবেন।

উদাহরণ: Multiple Thread Switching

Observable.fromArray(1, 2, 3, 4, 5)
    .subscribeOn(Schedulers.io())          // ডেটা I/O থ্রেডে তৈরি হবে
    .map(item -> {
        System.out.println("Processing on: " + Thread.currentThread().getName());
        return item * 2;
    })
    .observeOn(Schedulers.computation())   // Computation থ্রেডে ডেটা প্রসেস হবে
    .subscribe(item -> {
        System.out.println("Received on: " + Thread.currentThread().getName() + " -> " + item);
    });

Schedulers ব্যবহারের সেরা অভ্যাস:

  1. Heavy Workload:
    • ভারী কাজের জন্য Schedulers.io() বা Schedulers.computation() ব্যবহার করুন।
  2. UI Update (Android):
    • মেইন থ্রেডে কাজ করার জন্য AndroidSchedulers.mainThread() ব্যবহার করুন।
  3. Avoid Blocking:
    • মেইন থ্রেডে ব্লকিং অপারেশন এড়িয়ে চলুন।

পুরো প্রক্রিয়ার উদাহরণ:

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class RxJavaSchedulerExample {
    public static void main(String[] args) {
        Observable<String> observable = Observable.create(emitter -> {
            System.out.println("Observable thread: " + Thread.currentThread().getName());
            emitter.onNext("Hello");
            emitter.onNext("RxJava");
            emitter.onComplete();
        });

        observable
            .subscribeOn(Schedulers.io())             // Observable I/O থ্রেডে কাজ করবে
            .observeOn(Schedulers.computation())      // Observer Computation থ্রেডে কাজ করবে
            .subscribe(item -> {
                System.out.println("Observer thread: " + Thread.currentThread().getName());
                System.out.println("Received: " + item);
            });
    }
}

আউটপুট:

Observable thread: RxCachedThreadScheduler-1
Observer thread: RxComputationThreadPool-1
Received: Hello
Observer thread: RxComputationThreadPool-1
Received: RxJava

Schedulers ব্যবহার না করলে কী হয়?

  • ডিফল্টভাবে, Observable এবং Observer উভয়েই বর্তমান থ্রেড (main thread) ব্যবহার করে কাজ করবে।
  • এটি Android UI বা ব্যাকগ্রাউন্ড অ্যাসিনক্রোনাস কাজের জন্য প্রায়শই যথেষ্ট নয়।

উপসংহার:

  • Schedulers-এর সাহায্যে আপনি সহজেই ডেটা স্ট্রিম বিভিন্ন থ্রেডে পরিচালনা করতে পারবেন।
  • Concurrency ব্যবস্থাপনার জন্য এটি একটি কার্যকরী টুল।
  • subscribeOn() এবং observeOn() ব্যবহার করে Observable এবং Observer-এর থ্রেড নির্ধারণ করা যায়।

RxJava-তে Schedulers-এর সঠিক ব্যবহার অ্যাপ্লিকেশনের কার্যক্ষমতা এবং স্থিতিশীলতা বাড়াতে সাহায্য করে।

Content added By

RxJava-তে Schedulers হলো thread management এর জন্য ব্যবহৃত একটি প্রক্রিয়া। এটি বিভিন্ন thread-এ ডেটা প্রসেসিং ও কাজ চালানোর জন্য ব্যবহার করা হয়।

Reactive Programming-এ কাজগুলো asynchronous বা concurrent হওয়ায় সঠিক thread নির্বাচন করা অত্যন্ত গুরুত্বপূর্ণ। Schedulers আমাদের এই সুবিধা দেয় যাতে আমরা কাজগুলো সহজে background thread, computation thread, অথবা main thread-এ পরিচালনা করতে পারি।


Schedulers কেন ব্যবহার করা হয়?

Schedulers-এর প্রধান কাজ হলো:

  1. Thread Management সহজ করা:
    সঠিক thread-এ কাজ নির্ধারণ করা, যেমন UI কাজ main thread-এ এবং ব্যাকগ্রাউন্ড কাজ background thread-এ।
  2. Asynchronous কাজ সহজ করা:
    RxJava-তে Observable এবং Observer এর কাজগুলো বিভিন্ন thread-এ চালাতে।
  3. Performance বাড়ানো:
    Background threads-এ computational বা I/O-intensive কাজগুলো পরিচালনা করে মূল (main) thread-এর উপর চাপ কমানো।
  4. Thread Switching সহজ করা:
    সহজেই ডেটা স্ট্রিম একাধিক thread-এ পরিচালনা করা। উদাহরণস্বরূপ, I/O কাজ background thread-এ এবং UI update main thread-এ।

RxJava-তে Schedulers এর ধরণ:

RxJava বিভিন্ন ধরণের Schedulers প্রদান করে, প্রতিটি ভিন্ন ভিন্ন কাজের জন্য উপযোগী:

  1. Schedulers.io()
    • I/O operations-এর জন্য ব্যবহৃত হয়, যেমন network calls, file operations, database queries।
    • এটি একটি thread pool ব্যবহার করে যা অনেকগুলো I/O কাজ পরিচালনা করতে পারে।
    • Example: Retrofit API calls।
  2. Schedulers.computation()
    • Computationally heavy কাজের জন্য ব্যবহৃত হয়, যেমন mathematical calculations, data processing।
    • এটি CPU cores-এর উপর ভিত্তি করে threads তৈরি করে।
  3. Schedulers.newThread()
    • প্রতিবার একটি নতুন thread তৈরি করে।
    • এটি খুব বেশি ব্যবহৃত হয় না কারণ এটি resources-intensive।
  4. Schedulers.single()
    • একটি single thread-এ কাজ চালানোর জন্য ব্যবহৃত হয়।
    • এটি sequential এবং ordered কাজের জন্য উপযোগী।
  5. AndroidSchedulers.mainThread()
    • Android UI updates-এর জন্য ব্যবহৃত হয়।
    • এটি শুধুমাত্র Android ডেভেলপমেন্টে ব্যবহৃত হয়।
  6. Schedulers.trampoline()
    • একই thread-এ কাজগুলো sequentially চালানোর জন্য ব্যবহৃত হয়।
    • এটি recursive কাজের জন্য উপযুক্ত।

Schedulers ব্যবহার করার উদাহরণ:

1. Schedulers.io() Example:

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class IoSchedulerExample {
    public static void main(String[] args) {
        Observable.fromCallable(() -> {
            // Simulating a long-running I/O operation
            Thread.sleep(1000);
            return "I/O Operation Completed!";
        })
        .subscribeOn(Schedulers.io()) // I/O কাজ background thread-এ
        .subscribe(result -> System.out.println(result + " on thread " + Thread.currentThread().getName()));
    }
}

আউটপুট:

I/O Operation Completed! on thread RxCachedThreadScheduler-1

2. Schedulers.computation() Example:

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ComputationSchedulerExample {
    public static void main(String[] args) {
        Observable.range(1, 5)
            .map(number -> number * number) // Computational কাজ
            .subscribeOn(Schedulers.computation()) // Computation thread-এ চালানো
            .subscribe(result -> System.out.println(result + " on thread " + Thread.currentThread().getName()));
    }
}

আউটপুট:

1 on thread RxComputationThreadPool-1  
4 on thread RxComputationThreadPool-1  
9 on thread RxComputationThreadPool-1  
16 on thread RxComputationThreadPool-1  
25 on thread RxComputationThreadPool-1  

3. AndroidSchedulers.mainThread() Example (Android):

Observable.just("Hello, RxJava!")
    .subscribeOn(Schedulers.io()) // Background thread
    .observeOn(AndroidSchedulers.mainThread()) // UI update main thread-এ
    .subscribe(result -> textView.setText(result));

subscribeOn() এবং observeOn() এর ভূমিকা

  1. subscribeOn(Scheduler):
    • এটি নির্ধারণ করে যে Observable কোন thread-এ কাজ শুরু করবে।
    • একাধিকবার ব্যবহার করলে শুধুমাত্র প্রথমটি কার্যকর হবে।
  2. observeOn(Scheduler):
    • এটি নির্ধারণ করে Observer বা Subscriber কোন thread-এ কাজ করবে।
    • একাধিকবার ব্যবহার করলে প্রতিবারই কার্যকর হবে।

Schedulers-এর সুবিধা

  1. Concurrency সহজ করে।
  2. Thread Switching-এর মাধ্যমে কাজ পরিচালনা সহজ।
  3. Performance বৃদ্ধি পায়।
  4. UI Thread-এ ব্লকিং কাজ এড়ানো যায়।

Schedulers ব্যবহার করার সেরা অনুশীলন (Best Practices):

  1. ব্যাকগ্রাউন্ড কাজের জন্য Schedulers.io() বা Schedulers.computation() ব্যবহার করুন।
  2. UI আপডেটের জন্য সর্বদা AndroidSchedulers.mainThread() ব্যবহার করুন।
  3. dispose() ব্যবহার করে subscription বন্ধ করুন, যাতে memory leaks এড়ানো যায়।
  4. বড় বা দীর্ঘ-running কাজগুলোর জন্য নতুন thread তৈরি না করে thread pools ব্যবহার করুন।

RxJava-তে Schedulers সঠিকভাবে ব্যবহার করলে আপনার প্রোগ্রামের performance এবং efficiency অনেক বৃদ্ধি পাবে।

Content added By

RxJava-তে Schedulers হলো থ্রেড ম্যানেজমেন্টের জন্য ব্যবহৃত একটি উপাদান। এটি নির্ধারণ করে যে কোন থ্রেডে কাজটি সম্পন্ন হবে।

RxJava-তে সাধারণত তিনটি গুরুত্বপূর্ণ Scheduler রয়েছে:

  1. IO Scheduler
  2. Computation Scheduler
  3. New Thread Scheduler

এগুলো বিভিন্ন ধরণের অ্যাসিঙ্ক্রোনাস কাজ করার জন্য ব্যবহৃত হয়।


1. IO Scheduler

  • উপযুক্ত কাজ:
    এটি I/O সম্পর্কিত কাজ যেমন ডেটাবেস কল, REST API রিকোয়েস্ট, ফাইল অপারেশন, বা নেটওয়ার্ক রিকোয়েস্ট-এর জন্য উপযুক্ত।
  • বিশেষ বৈশিষ্ট্য:
    • এটি একটি thread pool ব্যবহার করে।
    • প্রতিটি কাজের জন্য আলাদা থ্রেড বরাদ্দ করে।
    • থ্রেড পুনঃব্যবহার করতে সক্ষম।
  • ব্যবহার:
    Schedulers.io()

উদাহরণ:

Observable.fromCallable(() -> {
    // কোনো I/O কাজ (যেমন ফাইল পড়া বা API কল)
    Thread.sleep(1000); // Simulating delay
    return "Data fetched from API";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(System.out::println);

2. Computation Scheduler

  • উপযুক্ত কাজ:
    CPU-ইনটেনসিভ কাজ যেমন ডেটা প্রসেসিং, অ্যালগরিদম চালনা, বা হিসাব-সংক্রান্ত কাজের জন্য।
  • বিশেষ বৈশিষ্ট্য:
    • এটি একটি fixed-size thread pool ব্যবহার করে।
    • থ্রেড সংখ্যা ডিভাইসের CPU কোরের সমান।
    • এটি I/O সম্পর্কিত কাজের জন্য নয়।
  • ব্যবহার:
    Schedulers.computation()

উদাহরণ:

Observable.range(1, 10)
    .map(i -> {
        System.out.println("Processing on: " + Thread.currentThread().getName());
        return i * 2; // Computational task
    })
    .subscribeOn(Schedulers.computation())
    .observeOn(Schedulers.single())
    .subscribe(result -> System.out.println("Received: " + result));

3. New Thread Scheduler

  • উপযুক্ত কাজ:
    যখন প্রতিবার একটি নতুন থ্রেড তৈরি করা প্রয়োজন, যেমন স্বতন্ত্র কাজ যেখানে থ্রেড পুনঃব্যবহার করা ঠিক নয়।
  • বিশেষ বৈশিষ্ট্য:
    • এটি প্রতিটি কাজের জন্য একটি নতুন থ্রেড তৈরি করে।
    • এটি অনেক বেশি থ্রেড তৈরি করে, যা অতিরিক্ত ব্যয়বহুল হতে পারে।
  • ব্যবহার:
    Schedulers.newThread()

উদাহরণ:

Observable.create(emitter -> {
    emitter.onNext("Task started");
    Thread.sleep(500);
    emitter.onNext("Task completed");
    emitter.onComplete();
})
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.single())
.subscribe(System.out::println);

Schedulers-এর সংমিশ্রণ

RxJava-তে আপনি সাধারণত দুটি Scheduler ব্যবহার করেন:

  1. subscribeOn(Scheduler):
    কোন Scheduler-এ Observable কাজ করবে তা নির্ধারণ করে।
  2. observeOn(Scheduler):
    কোন Scheduler-এ Observer কাজ করবে তা নির্ধারণ করে।

উদাহরণ:

Observable.just("Task")
    .subscribeOn(Schedulers.io())      // কাজ I/O থ্রেডে সম্পন্ন হবে
    .observeOn(Schedulers.computation()) // রেজাল্ট computation থ্রেডে প্রসেস হবে
    .subscribe(result -> System.out.println("Received on: " + Thread.currentThread().getName()));

তুলনামূলক বিশ্লেষণ

Schedulerব্যবহারযোগ্যতাথ্রেড ব্যবস্থাপনাউদাহরণ কাজ
IO SchedulerI/O সম্পর্কিত কাজথ্রেড পুল, থ্রেড পুনঃব্যবহারREST API, ফাইল পড়া বা লেখা
Computation SchedulerCPU-ইনটেনসিভ কাজFixed-size (CPU cores অনুযায়ী)ডেটা প্রসেসিং, অ্যালগরিদম চালনা
New Thread Schedulerস্বতন্ত্র নতুন থ্রেড প্রয়োজনপ্রতিবার নতুন থ্রেড তৈরিনির্দিষ্ট স্বতন্ত্র কাজ

প্রধান বিষয়:

  1. IO Scheduler: অধিকাংশ অ্যাসিঙ্ক্রোনাস কাজের জন্য আদর্শ।
  2. Computation Scheduler: CPU-নির্ভর কাজের জন্য সেরা।
  3. New Thread Scheduler: বিশেষ পরিস্থিতিতে স্বতন্ত্র নতুন থ্রেড প্রয়োজন হলে ব্যবহার করুন।

Schedulers নির্বাচন করার সময় কাজের ধরন এবং পারফরম্যান্স প্রয়োজনীয়তা বিবেচনা করুন।

Content added By

RxJava-তে subscribeOn() এবং observeOn() অপারেটর ব্যবহার করে বিভিন্ন থ্রেডে কাজগুলি পরিচালনা করা যায়। এগুলো concurrency এবং threading এর কন্ট্রোল দিতে সাহায্য করে।


subscribeOn() অপারেটর

  • কোথায় কাজ করে:
    এটি নির্ধারণ করে যে Observable কোন থ্রেডে কাজ করবে বা ডেটা এমিট করবে।
  • ব্যবহার:
    subscribeOn() অপারেটরটি শুধুমাত্র একবার কাজ করে এবং ডেটা এমিশন প্রসেসে প্রথমে প্রয়োগ করা হয়।

উদাহরণ:

Observable<Integer> observable = Observable.create(emitter -> {
    System.out.println("Observable thread: " + Thread.currentThread().getName());
    emitter.onNext(1);
    emitter.onComplete();
});

observable
    .subscribeOn(Schedulers.io()) // Observable runs on IO thread
    .subscribe(item -> System.out.println("Received: " + item));

আউটপুট:

Observable thread: RxCachedThreadScheduler
Received: 1

observeOn() অপারেটর

  • কোথায় কাজ করে:
    এটি নির্ধারণ করে যে Observer বা সাবস্ক্রাইবার কোন থ্রেডে কাজ করবে।
  • ব্যবহার:
    observeOn() অপারেটরটি ডেটা প্রক্রিয়াকরণের থ্রেড পরিবর্তন করতে ব্যবহার করা হয়। এটি একাধিকবার প্রয়োগ করা যেতে পারে।

উদাহরণ:

Observable<Integer> observable = Observable.create(emitter -> {
    System.out.println("Observable thread: " + Thread.currentThread().getName());
    emitter.onNext(1);
    emitter.onComplete();
});

observable
    .subscribeOn(Schedulers.io())              // Observable runs on IO thread
    .observeOn(Schedulers.computation())       // Observer runs on Computation thread
    .subscribe(item -> System.out.println("Observer thread: " + Thread.currentThread().getName()));

আউটপুট:

Observable thread: RxCachedThreadScheduler
Observer thread: RxComputationThreadPool

subscribeOn() এবং observeOn() একসাথে ব্যবহার

RxJava-তে এই দুটি অপারেটর একত্রে ব্যবহার করলে, আপনি Observable এবং Observer এর থ্রেড আলাদাভাবে নির্ধারণ করতে পারেন।

উদাহরণ:

Observable<Integer> observable = Observable.create(emitter -> {
    System.out.println("Emitting on: " + Thread.currentThread().getName());
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onComplete();
});

observable
    .subscribeOn(Schedulers.io())               // Observable runs on IO thread
    .observeOn(Schedulers.computation())        // Observer runs on Computation thread
    .subscribe(
        item -> System.out.println("Processing on: " + Thread.currentThread().getName() + " - Received: " + item)
    );

আউটপুট:

Emitting on: RxCachedThreadScheduler
Processing on: RxComputationThreadPool - Received: 1
Processing on: RxComputationThreadPool - Received: 2

মাল্টিপল observeOn() ব্যবহার

observeOn() একাধিকবার ব্যবহার করলে, প্রতিটি পরবর্তী ধাপ আলাদা থ্রেডে চালানো যায়।

উদাহরণ:

Observable<Integer> observable = Observable.create(emitter -> {
    System.out.println("Emitting on: " + Thread.currentThread().getName());
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onComplete();
});

observable
    .subscribeOn(Schedulers.io())                // Emission happens on IO thread
    .observeOn(Schedulers.computation())         // First observer runs on Computation thread
    .map(item -> {
        System.out.println("Mapping on: " + Thread.currentThread().getName());
        return item * 2;
    })
    .observeOn(Schedulers.newThread())           // Second observer runs on a New thread
    .subscribe(item -> System.out.println("Final processing on: " + Thread.currentThread().getName() + " - Received: " + item));

আউটপুট:

Emitting on: RxCachedThreadScheduler
Mapping on: RxComputationThreadPool
Final processing on: RxNewThreadScheduler - Received: 2
Mapping on: RxComputationThreadPool
Final processing on: RxNewThreadScheduler - Received: 4

Schedulers সম্পর্কে সংক্ষিপ্ত বিবরণ:

RxJava-তে বিভিন্ন ধরনের Schedulers রয়েছে, যেগুলো থ্রেড ম্যানেজমেন্টে ব্যবহৃত হয়।

  1. Schedulers.io()
    • IO সম্পর্কিত কাজ (ডাটাবেজ, নেটওয়ার্ক কল) এর জন্য ব্যবহৃত হয়।
    • Cached thread pool ব্যবহার করে।
  2. Schedulers.computation()
    • CPU-ইনটেনসিভ কাজ (কম্পিউটেশন) এর জন্য ব্যবহৃত হয়।
    • Fixed thread pool ব্যবহার করে (CPU cores এর উপর ভিত্তি করে)।
  3. Schedulers.newThread()
    • প্রতিবার একটি নতুন থ্রেড তৈরি করে।
  4. AndroidSchedulers.mainThread() (শুধু Android-এর জন্য)
    • Main UI thread-এ কাজ পরিচালনা করার জন্য ব্যবহৃত হয়।

উপসংহার:

  • subscribeOn() অপারেটর ব্যবহার করে আপনি Observable এর থ্রেড নির্ধারণ করতে পারেন।
  • observeOn() অপারেটর Observer এর থ্রেড নিয়ন্ত্রণ করতে সাহায্য করে।
  • একসাথে ব্যবহার করে আপনি একাধিক থ্রেডে কাজের লজিক তৈরি করতে পারেন।

এটি asynchronous এবং reactive programming-এ থ্রেড ম্যানেজমেন্টকে অনেক সহজ এবং কার্যকর করে তোলে।

Content added By

RxJava-তে Schedulers এবং Concurrency ব্যবহার করে asynchronous এবং parallel কাজ সহজে পরিচালনা করা যায়। Schedulers মূলত RxJava-কে বলে দেয় কোন থ্রেডে কাজটি সম্পন্ন করতে হবে। এটি ব্যাকগ্রাউন্ড থ্রেডে computational বা I/O কাজ পরিচালনা এবং UI থ্রেডে রেজাল্ট আপডেট করতে ব্যবহৃত হয়।


Schedulers এর ধরন

RxJava বিভিন্ন ধরনের Schedulers সরবরাহ করে, যেমন:

  1. Schedulers.io():
    • I/O অপারেশন (e.g., নেটওয়ার্ক কল, ফাইল অপারেশন) পরিচালনার জন্য ব্যবহৃত হয়।
    • এটি থ্রেড পুল ব্যবহার করে।
  2. Schedulers.computation():
    • Computational কাজ (e.g., ডেটা প্রসেসিং, algorithm) পরিচালনার জন্য ব্যবহৃত হয়।
    • এটি CPU-এর core অনুযায়ী থ্রেড তৈরি করে।
  3. Schedulers.newThread():
    • প্রতিবার একটি নতুন থ্রেড তৈরি করে।
  4. Schedulers.single():
    • একটি একক থ্রেডে কাজ সম্পন্ন করে।
  5. AndroidSchedulers.mainThread() (Android এ ব্যবহৃত):
    • UI থ্রেডে কাজ সম্পন্ন করে।

উদাহরণ 1: Schedulers ব্যবহার

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class RxJavaSchedulersExample {
    public static void main(String[] args) throws InterruptedException {

        // Create an Observable
        Observable<String> observable = Observable.create(emitter -> {
            System.out.println("Observable is emitting on: " + Thread.currentThread().getName());
            emitter.onNext("Hello");
            emitter.onNext("RxJava");
            emitter.onComplete();
        });

        // Subscribe with Schedulers
        observable
            .subscribeOn(Schedulers.io()) // Specify the thread for Observable
            .observeOn(Schedulers.computation()) // Specify the thread for Observer
            .subscribe(
                item -> System.out.println("Received: " + item + " on " + Thread.currentThread().getName()),
                error -> System.err.println("Error: " + error),
                () -> System.out.println("Completed on " + Thread.currentThread().getName())
            );

        // Sleep to allow asynchronous operation to complete
        Thread.sleep(1000);
    }
}

কোড বিশ্লেষণ:

  1. subscribeOn(Schedulers.io()):
    • Observable-এ কাজ io থ্রেডে সম্পন্ন হবে।
  2. observeOn(Schedulers.computation()):
    • Observer-এ কাজ computation থ্রেডে সম্পন্ন হবে।
  3. Thread.sleep(1000):
    • Asynchronous কাজ শেষ হওয়ার জন্য মেইন থ্রেডকে অপেক্ষা করতে দেয়া হয়েছে।

আউটপুট:

Observable is emitting on: RxCachedThreadScheduler-1
Received: Hello on RxComputationThreadPool-1
Received: RxJava on RxComputationThreadPool-1
Completed on RxComputationThreadPool-1

উদাহরণ 2: Schedulers ব্যবহার করে Parallel Execution

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class RxJavaParallelExample {
    public static void main(String[] args) throws InterruptedException {

        // Observable emitting multiple items
        Observable.range(1, 5)
            .flatMap(number -> 
                Observable.just(number)
                    .subscribeOn(Schedulers.computation()) // Parallel execution
                    .map(item -> {
                        System.out.println("Processing item " + item + " on " + Thread.currentThread().getName());
                        return item * 2; // Simulate processing
                    })
            )
            .observeOn(Schedulers.single()) // Collect results on a single thread
            .subscribe(
                result -> System.out.println("Result: " + result + " on " + Thread.currentThread().getName()),
                error -> System.err.println("Error: " + error),
                () -> System.out.println("All items processed!")
            );

        // Sleep to allow asynchronous operation to complete
        Thread.sleep(2000);
    }
}

কোড বিশ্লেষণ:

  1. flatMap():
    • প্রতিটি number আলাদা আলাদা থ্রেডে parallelভাবে প্রক্রিয়াকরণ করা হয়।
  2. Schedulers.computation():
    • Computational কাজ parallelভাবে সম্পন্ন করতে ব্যবহৃত হয়েছে।
  3. Schedulers.single():
    • Observer এর কাজ একটি single থ্রেডে সমাপ্ত হয়।

আউটপুট:

Processing item 1 on RxComputationThreadPool-1
Processing item 2 on RxComputationThreadPool-2
Processing item 3 on RxComputationThreadPool-3
Processing item 4 on RxComputationThreadPool-4
Processing item 5 on RxComputationThreadPool-5
Result: 2 on RxSingleScheduler-1
Result: 4 on RxSingleScheduler-1
Result: 6 on RxSingleScheduler-1
Result: 8 on RxSingleScheduler-1
Result: 10 on RxSingleScheduler-1
All items processed!

Schedulers এর ব্যবহার ক্ষেত্র:

  1. Background Tasks:
    • Heavy computational কাজ ব্যাকগ্রাউন্ডে সম্পন্ন করতে Schedulers.computation()
  2. Network Calls:
    • API কল বা ডেটাবেস অপারেশন পরিচালনায় Schedulers.io()
  3. UI Updates:
    • Android-এ UI থ্রেডে রেজাল্ট আপডেট করতে AndroidSchedulers.mainThread()

RxJava-র Schedulers asynchronous এবং parallel কাজগুলো সহজ ও কার্যকরভাবে পরিচালনার জন্য অপরিহার্য। উপরের উদাহরণগুলো ব্যবহার করে বিভিন্ন কাজের জন্য সঠিক Schedulers নির্বাচন করা শিখুন।

Content added By
Promotion

Are you sure to start over?

Loading...