উদাহরণ সহ Schedulers এবং Concurrency

Schedulers এবং Concurrency - আরএক্সজাভা (RxJava) - Java Technologies

346

RxJava-তে Schedulers এবং Concurrency ব্যবহার করে asynchronous এবং parallel কাজ সহজে পরিচালনা করা যায়। Schedulers মূলত RxJava-কে বলে দেয় কোন থ্রেডে কাজটি সম্পন্ন করতে হবে। এটি ব্যাকগ্রাউন্ড থ্রেডে computational বা I/O কাজ পরিচালনা এবং UI থ্রেডে রেজাল্ট আপডেট করতে ব্যবহৃত হয়।


Schedulers এর ধরন

RxJava বিভিন্ন ধরনের Schedulers সরবরাহ করে, যেমন:

  1. Schedulers.io():
    • I/O অপারেশন (e.g., নেটওয়ার্ক কল, ফাইল অপারেশন) পরিচালনার জন্য ব্যবহৃত হয়।
    • এটি থ্রেড পুল ব্যবহার করে।
  2. Schedulers.computation():
    • Computational কাজ (e.g., ডেটা প্রসেসিং, algorithm) পরিচালনার জন্য ব্যবহৃত হয়।
    • এটি CPU-এর core অনুযায়ী থ্রেড তৈরি করে।
  3. Schedulers.newThread():
    • প্রতিবার একটি নতুন থ্রেড তৈরি করে।
  4. Schedulers.single():
    • একটি একক থ্রেডে কাজ সম্পন্ন করে।
  5. AndroidSchedulers.mainThread() (Android এ ব্যবহৃত):
    • UI থ্রেডে কাজ সম্পন্ন করে।

উদাহরণ 1: Schedulers ব্যবহার

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class RxJavaSchedulersExample {
    public static void main(String[] args) throws InterruptedException {

        // Create an Observable
        Observable<String> observable = Observable.create(emitter -> {
            System.out.println("Observable is emitting on: " + Thread.currentThread().getName());
            emitter.onNext("Hello");
            emitter.onNext("RxJava");
            emitter.onComplete();
        });

        // Subscribe with Schedulers
        observable
            .subscribeOn(Schedulers.io()) // Specify the thread for Observable
            .observeOn(Schedulers.computation()) // Specify the thread for Observer
            .subscribe(
                item -> System.out.println("Received: " + item + " on " + Thread.currentThread().getName()),
                error -> System.err.println("Error: " + error),
                () -> System.out.println("Completed on " + Thread.currentThread().getName())
            );

        // Sleep to allow asynchronous operation to complete
        Thread.sleep(1000);
    }
}

কোড বিশ্লেষণ:

  1. subscribeOn(Schedulers.io()):
    • Observable-এ কাজ io থ্রেডে সম্পন্ন হবে।
  2. observeOn(Schedulers.computation()):
    • Observer-এ কাজ computation থ্রেডে সম্পন্ন হবে।
  3. Thread.sleep(1000):
    • Asynchronous কাজ শেষ হওয়ার জন্য মেইন থ্রেডকে অপেক্ষা করতে দেয়া হয়েছে।

আউটপুট:

Observable is emitting on: RxCachedThreadScheduler-1
Received: Hello on RxComputationThreadPool-1
Received: RxJava on RxComputationThreadPool-1
Completed on RxComputationThreadPool-1

উদাহরণ 2: Schedulers ব্যবহার করে Parallel Execution

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class RxJavaParallelExample {
    public static void main(String[] args) throws InterruptedException {

        // Observable emitting multiple items
        Observable.range(1, 5)
            .flatMap(number -> 
                Observable.just(number)
                    .subscribeOn(Schedulers.computation()) // Parallel execution
                    .map(item -> {
                        System.out.println("Processing item " + item + " on " + Thread.currentThread().getName());
                        return item * 2; // Simulate processing
                    })
            )
            .observeOn(Schedulers.single()) // Collect results on a single thread
            .subscribe(
                result -> System.out.println("Result: " + result + " on " + Thread.currentThread().getName()),
                error -> System.err.println("Error: " + error),
                () -> System.out.println("All items processed!")
            );

        // Sleep to allow asynchronous operation to complete
        Thread.sleep(2000);
    }
}

কোড বিশ্লেষণ:

  1. flatMap():
    • প্রতিটি number আলাদা আলাদা থ্রেডে parallelভাবে প্রক্রিয়াকরণ করা হয়।
  2. Schedulers.computation():
    • Computational কাজ parallelভাবে সম্পন্ন করতে ব্যবহৃত হয়েছে।
  3. Schedulers.single():
    • Observer এর কাজ একটি single থ্রেডে সমাপ্ত হয়।

আউটপুট:

Processing item 1 on RxComputationThreadPool-1
Processing item 2 on RxComputationThreadPool-2
Processing item 3 on RxComputationThreadPool-3
Processing item 4 on RxComputationThreadPool-4
Processing item 5 on RxComputationThreadPool-5
Result: 2 on RxSingleScheduler-1
Result: 4 on RxSingleScheduler-1
Result: 6 on RxSingleScheduler-1
Result: 8 on RxSingleScheduler-1
Result: 10 on RxSingleScheduler-1
All items processed!

Schedulers এর ব্যবহার ক্ষেত্র:

  1. Background Tasks:
    • Heavy computational কাজ ব্যাকগ্রাউন্ডে সম্পন্ন করতে Schedulers.computation()
  2. Network Calls:
    • API কল বা ডেটাবেস অপারেশন পরিচালনায় Schedulers.io()
  3. UI Updates:
    • Android-এ UI থ্রেডে রেজাল্ট আপডেট করতে AndroidSchedulers.mainThread()

RxJava-র Schedulers asynchronous এবং parallel কাজগুলো সহজ ও কার্যকরভাবে পরিচালনার জন্য অপরিহার্য। উপরের উদাহরণগুলো ব্যবহার করে বিভিন্ন কাজের জন্য সঠিক Schedulers নির্বাচন করা শিখুন।

Content added By
Promotion

Are you sure to start over?

Loading...