Observable এবং Observer

আরএক্সজাভা (RxJava) - Java Technologies

359

RxJava-তে Observable এবং Observer হল দুটি মূল উপাদান, যা Reactive Programming-এর মডেলের ভিত্তি গঠন করে। এগুলি ডেটা স্ট্রিম তৈরি ও প্রক্রিয়াকরণ করতে ব্যবহৃত হয়।


Observable:

Observable একটি ডেটা স্ট্রিম উৎপন্ন করে, যা এক বা একাধিক Observer-কে ডেটা সরবরাহ করে। এটি ডেটা "প্রকাশকারী" (Publisher) হিসেবে কাজ করে।

Observable-এর ভূমিকা:

  • ডেটা উৎপন্ন করে।
  • Observer-কে সেই ডেটা প্রদান করে।
  • প্রক্রিয়ার শেষে "complete" বা "error" সংকেত দেয়।

Observable তৈরি করার পদ্ধতি:

  1. just(): একটি নির্দিষ্ট ডেটা প্রদান করে।

    Observable<String> observable = Observable.just("Item 1", "Item 2", "Item 3");
    
  2. fromArray(): একটি array থেকে Observable তৈরি করে।

    Observable<Integer> observable = Observable.fromArray(1, 2, 3, 4, 5);
    
  3. create(): কাস্টম Observable তৈরি করতে।

    Observable<String> observable = Observable.create(emitter -> {
        emitter.onNext("Hello");
        emitter.onNext("RxJava");
        emitter.onComplete();
    });
    

Observer:

Observer হল সেই উপাদান, যা Observable-এর ডেটা সাবস্ক্রাইব করে এবং সেই ডেটা প্রক্রিয়া করে। এটি ডেটা "গ্রাহক" (Subscriber) হিসেবে কাজ করে।

Observer-এর ভূমিকা:

  • Observable-এর ডেটা গ্রহণ করে।
  • প্রাপ্ত ডেটার উপর প্রক্রিয়াকরণ সম্পন্ন করে।
  • Observable থেকে "complete" বা "error" সংকেতের প্রতিক্রিয়া জানায়।

Observer তৈরি করার পদ্ধতি:

  1. Anonymous Class ব্যবহার করে:

    Observer<String> observer = new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("Subscribed");
        }
    
        @Override
        public void onNext(String item) {
            System.out.println("Received: " + item);
        }
    
        @Override
        public void onError(Throwable e) {
            System.err.println("Error: " + e.getMessage());
        }
    
        @Override
        public void onComplete() {
            System.out.println("Done!");
        }
    };
    
  2. Lambda Expressions ব্যবহার করে:
    সহজ কোডিং স্টাইল।

    observable.subscribe(
        item -> System.out.println("Received: " + item), // onNext
        error -> System.err.println("Error: " + error),  // onError
        () -> System.out.println("Done!")               // onComplete
    );
    

Observable এবং Observer-এর Workflow:

  1. Observable ডেটা স্ট্রিম তৈরি করে।
  2. Observer সেই Observable-এ সাবস্ক্রাইব করে।
  3. Observable ডেটা (onNext), error (onError), এবং শেষের সংকেত (onComplete) প্রদান করে।

উদাহরণ:

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;

public class RxJavaExample {
    public static void main(String[] args) {
        // Observable তৈরি করা
        Observable<String> observable = Observable.just("Hello", "RxJava");

        // Observer তৈরি করা
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("Subscribed to Observable");
            }

            @Override
            public void onNext(String item) {
                System.out.println("Received: " + item);
            }

            @Override
            public void onError(Throwable e) {
                System.err.println("Error: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("All data received!");
            }
        };

        // Observable-এ Observer সাবস্ক্রাইব করা
        observable.subscribe(observer);
    }
}

Key Points:

  • Observable ডেটা তৈরি ও প্রেরণ করে।
  • Observer সেই ডেটা গ্রহণ ও প্রক্রিয়া করে।
  • onNext(), onError(), এবং onComplete() মেথডগুলো ব্যবহার করে ডেটা এবং অবস্থা পর্যবেক্ষণ করা হয়।
  • Observable এবং Observer-এর মধ্যে সম্পর্ক subscribe() মেথডের মাধ্যমে স্থাপন করা হয়।

Reactive Programming-এর ভিত্তি এই ধারণাগুলির উপর নির্ভরশীল। এগুলি বুঝে নিলে RxJava ব্যবহার অনেক সহজ হয়ে যায়।

Content added By

Observable হলো RxJava-এর একটি প্রধান কম্পোনেন্ট, যা data emitter (ডেটা প্রেরণকারী) হিসেবে কাজ করে। এটি ডেটা বা ইভেন্টের একটি ধারাবাহিক স্ট্রিম প্রদান করে। Observer (ডেটা গ্রহণকারী) এই স্ট্রিম থেকে ডেটা সংগ্রহ করে এবং সেটির উপর কাজ করে।

Observable মূলত Publisher এর কাজ করে এবং Reactive Programming এর Publish-Subscribe Pattern অনুসরণ করে।


Observable কিভাবে কাজ করে?

Observable-এর কাজের প্রধান ধাপগুলো নিম্নরূপ:

  1. Create:
    Observable ডেটা স্ট্রিম তৈরি করে। এটি ডেটা emit (প্রেরণ) করার জন্য দায়ী।
  2. Subscribe:
    একটি Observer বা Subscriber Observable-এ subscribe করে। এটি ডেটা বা ইভেন্ট গ্রহণ করার জন্য প্রস্তুত হয়।
  3. Emit:
    Observable বিভিন্ন ধরণের ডেটা emit করে (যেমন, single value, multiple values, বা error)। এটি তিনটি প্রকারের notification পাঠাতে পারে:
    • onNext(): প্রতিটি ডেটা আইটেম পাঠানোর জন্য।
    • onError(): কোনো ত্রুটি হলে।
    • onComplete(): ডেটা স্ট্রিম শেষ হলে।
  4. Dispose:
    Subscription বন্ধ করার জন্য dispose() ব্যবহার করা হয়। এটি মেমোরি লিক এড়াতে সাহায্য করে।

Observable-এর উদাহরণ

1. Simple Observable Example:

import io.reactivex.Observable;

public class ObservableExample {
    public static void main(String[] args) {
        // Observable তৈরি
        Observable<String> observable = Observable.just("Hello", "RxJava", "World");

        // Observer তৈরি এবং Subscribe করা
        observable.subscribe(
            item -> System.out.println("Received: " + item),     // onNext
            error -> System.err.println("Error: " + error),      // onError
            () -> System.out.println("Completed!")               // onComplete
        );
    }
}

আউটপুট:

Received: Hello  
Received: RxJava  
Received: World  
Completed!  

2. Observable এবং Operators:

import io.reactivex.Observable;

public class ObservableWithOperators {
    public static void main(String[] args) {
        Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5);

        observable
            .map(number -> number * 2)    // প্রতিটি সংখ্যাকে ২ দিয়ে গুন
            .filter(number -> number > 5) // শুধুমাত্র > 5 এর সংখ্যা নির্বাচন
            .subscribe(
                item -> System.out.println("Processed: " + item), // onNext
                error -> System.err.println("Error: " + error),   // onError
                () -> System.out.println("Stream Completed!")     // onComplete
            );
    }
}

আউটপুট:

Processed: 6  
Processed: 8  
Processed: 10  
Stream Completed!  

3. Observable থেকে Error Handling:

import io.reactivex.Observable;

public class ObservableErrorHandling {
    public static void main(String[] args) {
        Observable<Integer> observable = Observable.create(emitter -> {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onError(new Exception("Something went wrong!"));
            emitter.onNext(3); // এটি আর পাঠানো হবে না
            emitter.onComplete();
        });

        observable.subscribe(
            item -> System.out.println("Received: " + item),
            error -> System.err.println("Error: " + error.getMessage()), // Error Handling
            () -> System.out.println("Completed!")
        );
    }
}

আউটপুট:

Received: 1  
Received: 2  
Error: Something went wrong!  

Observable তৈরি করার পদ্ধতি

RxJava-তে Observable তৈরির বিভিন্ন উপায় রয়েছে:

  1. just(): এক বা একাধিক নির্দিষ্ট ডেটা emit করে।

    Observable<Integer> observable = Observable.just(1, 2, 3);
    
  2. fromIterable(): Iterable collection থেকে ডেটা emit করে।

    List<String> items = Arrays.asList("A", "B", "C");
    Observable<String> observable = Observable.fromIterable(items);
    
  3. create(): Custom ডেটা স্ট্রিম তৈরি করতে।

    Observable<Integer> observable = Observable.create(emitter -> {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onComplete();
    });
    
  4. range(): একটি নির্দিষ্ট সীমার মধ্যে সংখ্যা emit করে।

    Observable<Integer> observable = Observable.range(1, 5);
    
  5. interval(): নির্দিষ্ট সময়ের বিরতিতে ডেটা emit করে (সাধারণত asynchronous কাজের জন্য ব্যবহৃত)।

    Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS);
    

Observable-এর সুবিধা

  1. Asynchronous এবং Event-driven Programming সহজ করে।
  2. Functional এবং Declarative কোডিং স্টাইল।
  3. Operators-এর মাধ্যমে সহজ ডেটা প্রক্রিয়াকরণ।
  4. Thread Management-এর জন্য Schedulers ব্যবহার।

Observable এর সীমাবদ্ধতা

  1. শিখতে কিছুটা সময় লাগে।
  2. Debugging অনেক সময় জটিল হয়ে যায়।
  3. Subscription সঠিকভাবে dispose() না করলে মেমোরি লিক হতে পারে।

RxJava-তে Observable একটি গুরুত্বপূর্ণ ভূমিকা পালন করে এবং এটি asynchronous programming আরও কার্যকর ও সহজতর করে।

Content added By

RxJava-তে Observer হলো একটি ইন্টারফেস, যা Observable-এর সাথে কাজ করে। এটি ডেটা স্ট্রিম পর্যবেক্ষণ করে এবং স্ট্রিম থেকে প্রাপ্ত ডেটা বা ইভেন্টগুলোতে প্রতিক্রিয়া জানায়।

Observer-এর ভূমিকা:

  1. ডেটা স্ট্রিম পর্যবেক্ষণ:
    Observer Observable-এর তৈরি ডেটা স্ট্রিম পর্যবেক্ষণ করে এবং নতুন ডেটা বা ইভেন্ট প্রাপ্ত হলে তা গ্রহণ করে।
  2. ইভেন্ট হ্যান্ডলিং:
    Observer তিন ধরনের ইভেন্ট পরিচালনা করতে পারে:
    • onNext(): যখন নতুন ডেটা আসে।
    • onError(): যখন কোনো ত্রুটি ঘটে।
    • onComplete(): যখন ডেটা স্ট্রিম সম্পূর্ণ হয়।
  3. রেসপন্স তৈরি:
    ডেটা বা ইভেন্টের উপর ভিত্তি করে Observer একটি নির্দিষ্ট অ্যাকশন বা রেসপন্স সম্পাদন করে।

Observer কাজের প্রক্রিয়া:

  1. Subscription:
    Observer একটি Observable-এর সাথে সংযুক্ত হয় (Subscribe করে), এবং এর ডেটা স্ট্রিম বা ইভেন্টগুলো পর্যবেক্ষণ শুরু করে।
  2. ইভেন্ট প্রাপ্তি:
    Observable থেকে ডেটা বা ইভেন্ট আসার সাথে সাথে Observer প্রাসঙ্গিক ইভেন্ট মেথড (onNext, onError, বা onComplete) কল করে।
  3. অ্যাকশন সম্পাদন:
    প্রতিটি ইভেন্টে নির্ধারিত অ্যাকশন বা অপারেশন সম্পাদিত হয়।

উদাহরণ: Observer-এর কাজ

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;

public class ObserverExample {
    public static void main(String[] args) {
        // Observable তৈরি
        Observable<String> observable = Observable.just("Hello", "RxJava", "World");

        // Observer তৈরি
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("Subscribed to Observable");
            }

            @Override
            public void onNext(String s) {
                System.out.println("Received: " + s);
            }

            @Override
            public void onError(Throwable e) {
                System.err.println("Error occurred: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("All data received. Completed!");
            }
        };

        // Observable-এ Observer যুক্ত করা
        observable.subscribe(observer);
    }
}

আউটপুট:

Subscribed to Observable
Received: Hello
Received: RxJava
Received: World
All data received. Completed!

মেথডসমূহের বিস্তারিত:

  1. onSubscribe(Disposable d):
    • Observable-এর সাথে Observer যুক্ত হলে প্রথমে এটি কল হয়।
    • এখানে আপনি Subscription ম্যানেজ করতে পারেন (যদি প্রয়োজন হয় Subscription বন্ধ করা)।
  2. onNext(T t):
    • Observable থেকে ডেটা প্রাপ্ত হলে এটি কল হয়।
    • প্রত্যেকটি ডেটার জন্য আলাদা করে onNext কল হয়।
  3. onError(Throwable e):
    • Observable-এ কোনো ত্রুটি ঘটলে এটি কল হয়।
    • একবার onError কল হলে, আর কোনো ডেটা বা ইভেন্ট পাঠানো হয় না।
  4. onComplete():
    • Observable থেকে সমস্ত ডেটা পাঠানোর পরে এটি কল হয়।
    • onComplete কল হলে, এটি নিশ্চিত করে যে ডেটা স্ট্রিম শেষ হয়েছে।

Observer এবং Observable-এর ইন্টারঅ্যাকশন:

  • Observable ডেটা ইমিট (emit) করে।
  • Observer সেই ডেটা গ্রহণ করে এবং নির্ধারিত ইভেন্ট হ্যান্ডলারে সাড়া দেয়।
  • Subscription-এর মাধ্যমে এই সম্পর্কটি সক্রিয় থাকে।

RxJava-তে Observer গুরুত্বপূর্ণ কারণ এটি অ্যাসিঙ্ক্রোনাস ইভেন্ট ড্রাইভেন প্রোগ্রামিং সহজ করে তোলে।

Content added By

Cold Observables এবং Hot Observables হল RxJava-এর দুটি গুরুত্বপূর্ণ ধারণা, যা Observable এর আচরণের উপর ভিত্তি করে বিভক্ত। এদের প্রধান পার্থক্য হল ডেটা স্ট্রিম কিভাবে প্রেরিত হয় এবং সাবস্ক্রাইবারদের (Subscribers) সাথে কিভাবে আচরণ করে।


Cold Observable:

  • ডেটা স্ট্রিম শুধুমাত্র সাবস্ক্রাইব করার পর শুরু হয়।
    যখন কেউ Observable-এ সাবস্ক্রাইব করে, তখন Observable ডেটা এমিট করা শুরু করে।
  • প্রতিটি সাবস্ক্রাইবার নতুন ডেটা স্ট্রিম পায়।
  • Cold Observable-এর ডেটা static থাকে, অর্থাৎ প্রত্যেক সাবস্ক্রাইবারের জন্য ডেটা পুনরায় এমিট হয়।
  • উদাহরণ: API কল, ডেটাবেস কোয়েরি, বা ফাইল রিড অপারেশন।

উদাহরণ:

import io.reactivex.Observable;

public class ColdObservableExample {
    public static void main(String[] args) {
        // Create a Cold Observable
        Observable<Integer> observable = Observable.just(1, 2, 3, 4);

        // First Subscriber
        observable.subscribe(item -> System.out.println("Subscriber 1: " + item));

        // Second Subscriber
        observable.subscribe(item -> System.out.println("Subscriber 2: " + item));
    }
}

আউটপুট:

Subscriber 1: 1  
Subscriber 1: 2  
Subscriber 1: 3  
Subscriber 1: 4  
Subscriber 2: 1  
Subscriber 2: 2  
Subscriber 2: 3  
Subscriber 2: 4  

Hot Observable:

  • ডেটা স্ট্রিম সাবস্ক্রাইবারদের জন্য অপেক্ষা না করে চলতে থাকে।
    Observable ডেটা এমিট করতে শুরু করে, এমনকি যদি কোনো সাবস্ক্রাইবার না থাকে।
  • নতুন সাবস্ক্রাইবাররা তখন থেকেই ডেটা পেতে শুরু করে যখন তারা সাবস্ক্রাইব করে। আগে প্রেরিত ডেটা তারা পায় না।
  • Hot Observable সাধারণত লাইভ ডেটা বা event-driven operations-এর জন্য ব্যবহৃত হয়।
  • উদাহরণ: Mouse clicks, Keyboard events, বা WebSocket connections।

উদাহরণ:

import io.reactivex.Observable;
import io.reactivex.subjects.PublishSubject;

public class HotObservableExample {
    public static void main(String[] args) {
        // Create a Hot Observable using PublishSubject
        PublishSubject<Integer> hotObservable = PublishSubject.create();

        // First Subscriber subscribes
        hotObservable.subscribe(item -> System.out.println("Subscriber 1: " + item));

        // Emit some data
        hotObservable.onNext(1);
        hotObservable.onNext(2);

        // Second Subscriber subscribes
        hotObservable.subscribe(item -> System.out.println("Subscriber 2: " + item));

        // Emit more data
        hotObservable.onNext(3);
        hotObservable.onNext(4);
    }
}

আউটপুট:

Subscriber 1: 1  
Subscriber 1: 2  
Subscriber 1: 3  
Subscriber 2: 3  
Subscriber 1: 4  
Subscriber 2: 4  

Cold এবং Hot Observable-এর প্রধান পার্থক্য:

বৈশিষ্ট্যCold ObservableHot Observable
ডেটা স্ট্রিম শুরুসাবস্ক্রিপশন-এর পরে শুরু হয়।সাবস্ক্রিপশন-এর আগে থেকেই চলমান।
প্রত্যেক সাবস্ক্রাইবারের জন্য ডেটানতুন করে ডেটা এমিট হয়।সাবস্ক্রাইব করার সময় থেকে ডেটা পায়।
ব্যবহারক্ষেত্রস্ট্যাটিক বা ফিক্সড ডেটার জন্য।লাইভ ডেটা স্ট্রিমের জন্য।
উদাহরণAPI calls, Database queries।Mouse clicks, WebSocket data।

Hybrid Observables:

কিছু Observable, যেমন ConnectableObservable, Hot এবং Cold এর মিশ্রণ হিসাবে কাজ করতে পারে। এগুলি .publish() এবং .connect() এর মাধ্যমে কনফিগার করা যায়।

উপসংহার:

Cold Observable ডেটা পুনরায় তৈরি করতে পারে এবং নির্ধারিত ডেটা এমিট করার জন্য কার্যকর, যেখানে Hot Observable বাস্তব-সময়ের ডেটা বা ইভেন্ট পরিচালনার জন্য আদর্শ।

Content added By

RxJava-তে Observable এবং Observer হল দুইটি প্রধান component। নিচে উদাহরণসহ এই দুটি component কীভাবে তৈরি এবং ব্যবহার করা হয় তা দেখানো হলো।


1. Observable এবং Observer তৈরি:

Observable এমন একটি entity যা ডেটা ইমিট করে। Observer সেই ডেটা consume করে। Observable এবং Observer এর মধ্যে subscription এর মাধ্যমে ডেটা পাঠানো হয়।


উদাহরণ: Observable এবং Observer তৈরি

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;

public class RxJavaExample {
    public static void main(String[] args) {

        // 1. Create an Observable
        Observable<String> observable = Observable.create(emitter -> {
            try {
                emitter.onNext("Item 1"); // Emit first item
                emitter.onNext("Item 2"); // Emit second item
                emitter.onNext("Item 3"); // Emit third item
                emitter.onComplete();     // Signal completion
            } catch (Exception e) {
                emitter.onError(e);      // Signal an error if any
            }
        });

        // 2. Create an Observer
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("Subscribed to Observable");
            }

            @Override
            public void onNext(String item) {
                System.out.println("Received: " + item);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("Error occurred: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("All items are received!");
            }
        };

        // 3. Subscribe the Observer to the Observable
        observable.subscribe(observer);
    }
}

কোড বিশ্লেষণ:

  1. Observable তৈরি:

    Observable<String> observable = Observable.create(emitter -> { ... });
    
    • create() মেথড ব্যবহার করে Observable তৈরি করা হয়েছে।
    • onNext() মেথড ডেটা ইমিট করে।
    • onComplete() ইভেন্ট শেষ হওয়ার সংকেত দেয়।
    • onError() কোনো ত্রুটি হলে তা সংকেত দেয়।
  2. Observer তৈরি:

    Observer<String> observer = new Observer<String>() { ... };
    
    • onSubscribe(): subscription শুরু হলে ডাকা হয়।
    • onNext() ডেটা রিসিভ হলে ডাকা হয়।
    • onError() ত্রুটি থাকলে ডাকা হয়।
    • onComplete() সব ডেটা পাওয়া শেষ হলে ডাকা হয়।
  3. Subscribe করা:

    observable.subscribe(observer);
    
    • Observer কে Observable এর সাথে সংযুক্ত করা হয়েছে।

2. সংক্ষিপ্ত উদাহরণ (Lambda Expressions):

RxJava এর সাথে Lambda expressions ব্যবহার করে কোডকে আরো সহজ করা যায়।

import io.reactivex.Observable;

public class RxJavaLambdaExample {
    public static void main(String[] args) {

        // Create an Observable
        Observable<String> observable = Observable.just("Hello", "RxJava", "World");

        // Subscribe to the Observable using Lambda
        observable.subscribe(
            item -> System.out.println("Received: " + item),  // onNext
            error -> System.out.println("Error: " + error),   // onError
            () -> System.out.println("Completed!")            // onComplete
        );
    }
}

Lambda কোড বিশ্লেষণ:

  1. Observable.just() একটি Observable তৈরি করে যা ডেটা ইমিট করে।
  2. subscribe() এর ভিতরে তিনটি lambda function ব্যবহার করা হয়েছে:
    • প্রথমটি ডেটা (onNext) প্রক্রিয়াকরণে,
    • দ্বিতীয়টি ত্রুটি (onError) হ্যান্ডেল করতে,
    • তৃতীয়টি সম্পন্ন হলে (onComplete) একটি সংকেত পাঠাতে।

আউটপুট:

উদাহরণ 1:

Subscribed to Observable
Received: Item 1
Received: Item 2
Received: Item 3
All items are received!

উদাহরণ 2:

Received: Hello
Received: RxJava
Received: World
Completed!

লক্ষ্য:

  1. Observable এবং Observer এর কাজ asynchronous এবং event-driven হওয়ায় এটি বড় ডেটা বা real-time ইভেন্ট পরিচালনার জন্য উপযুক্ত।
  2. RxJava এর Lambda expressions কোডকে concise এবং সহজ করে তোলে।
Content added By
Promotion

Are you sure to start over?

Loading...