Advanced RxJS Techniques

আরএক্সজেএস (RxJS) - Web Development

332

RxJS (Reactive Extensions for JavaScript) হল একটি শক্তিশালী লাইব্রেরি, যা অ্যাসিঙ্ক্রোনাস ডেটা স্ট্রিম এবং ইভেন্ট-ভিত্তিক প্রোগ্রামিংয়ে ব্যবহৃত হয়। RxJS ব্যবহারের মাধ্যমে আপনি reactive programming করতে পারেন, যেখানে ডেটার পরিবর্তন বা স্ট্রিমের ঘটনার উপর ভিত্তি করে প্রতিক্রিয়া জানানো হয়। তবে, RxJS-এর সাথে কাজ করার জন্য কিছু advanced techniques জানা জরুরি, যা আপনাকে আরও শক্তিশালী এবং দক্ষ অ্যাপ্লিকেশন তৈরি করতে সহায়তা করবে।

এই টিউটোরিয়ালে আমরা কিছু advanced RxJS techniques এবং operators সম্পর্কে আলোচনা করব, যেগুলি আপনার কোডকে আরও ক্লিন এবং পারফরম্যান্সে দক্ষ করবে।


1. Higher-order Observables

Higher-order Observables হল এমন Observable, যা অন্য Observable গুলিকে প্রসেস বা ফিরিয়ে আনে। RxJS-এ এই ধরনের Observables ব্যবহৃত হয় যখন একটি Observable এর ভিতরে অন্য Observable থাকে। এটি সাধারণত switchMap(), concatMap(), mergeMap(), এবং exhaustMap() এর মাধ্যমে করা হয়।

উদাহরণ: switchMap() ব্যবহার করে Higher-order Observables

import { of, interval } from 'rxjs';
import { switchMap, map } from 'rxjs/operators';

const observable1 = interval(1000);  // Emit every second
const observable2 = of('A', 'B', 'C'); // Another observable

observable1.pipe(
  switchMap(() => observable2)  // Switch to observable2 every time observable1 emits
).subscribe(value => {
  console.log(value);  // Logs: A, B, C (each time observable1 emits)
});

ব্যাখ্যা:

  • switchMap() মূল Observable (observable1) থেকে নতুন Observable (observable2) কে সুইচ করে এবং শুধুমাত্র সবচেয়ে সাম্প্রতিক সাবস্ক্রিপশন থেকে মান গ্রহণ করে।
  • পূর্বের মানগুলো বাতিল করা হয় যখন নতুন Observable সক্রিয় হয়।

2. Multicasting Operators

Multicasting অপারেটরগুলির মাধ্যমে আপনি একাধিক সাবস্ক্রাইবারের কাছে একক Observable এর মান শেয়ার করতে পারেন। RxJS-এ এমন কিছু অপারেটর রয়েছে যা subject-based multicasting পরিচালনা করতে সহায়তা করে, যেমন share(), publish(), shareReplay() ইত্যাদি।

উদাহরণ: shareReplay() দিয়ে Multicasting

import { of } from 'rxjs';
import { shareReplay } from 'rxjs/operators';

const observable = of('Hello', 'World').pipe(
  shareReplay(1)  // Replay the last emitted value to all subscribers
);

observable.subscribe(value => console.log('Subscriber 1:', value));
observable.subscribe(value => console.log('Subscriber 2:', value));

আউটপুট:

Subscriber 1: Hello
Subscriber 1: World
Subscriber 2: Hello
Subscriber 2: World

ব্যাখ্যা:

  • shareReplay(1) ব্যবহার করে, সর্বশেষ মানটি replay করা হয় নতুন সাবস্ক্রাইবারদের কাছে।
  • এটি একটি hot observable তৈরি করে, যা পূর্বের মানগুলিকে সমস্ত সাবস্ক্রাইবারদের কাছে পাঠায়।

3. Windowing Operators

Windowing অপারেটরগুলি একটি Observable এর ডেটা স্ট্রিমকে ছোট ছোট অংশে বিভক্ত করার জন্য ব্যবহৃত হয়। এর মাধ্যমে আপনি বড় ডেটা স্ট্রিমকে ছোট ছোট windows এ বিভক্ত করতে পারেন, যা আরও সহজে প্রসেস করা যায়।

উদাহরণ: window() ব্যবহার করে Windowing

import { interval } from 'rxjs';
import { window, take, mergeAll } from 'rxjs/operators';

const observable = interval(1000);

observable.pipe(
  window(observable.pipe(take(3))),  // Group every 3 emissions into a window
  mergeAll()  // Flatten the windows
).subscribe(value => console.log(value));

আউটপুট:

0
1
2
3
4
5

ব্যাখ্যা:

  • window() ব্যবহার করে, আমরা স্ট্রিমের ডেটাকে গ্রুপ করে ফেলেছি (প্রতিটি ৩টি ভ্যালু একটি window হিসেবে তৈরি হবে) এবং mergeAll() ব্যবহার করে আমরা সেই windows গুলোকে একত্রিত করেছি।

4. Using the retryWhen() Operator for Retry Logic

retryWhen() একটি অত্যন্ত শক্তিশালী অপারেটর যা ত্রুটির পর পুনরায় চেষ্টা করার জন্য custom retry logic তৈরি করতে ব্যবহৃত হয়। এটি ত্রুটি ঘটলে পুনরায় চেষ্টা করার জন্য কাস্টম লজিক তৈরি করতে সহায়তা করে।

উদাহরণ: retryWhen() দিয়ে Custom Retry Logic

import { of, throwError } from 'rxjs';
import { retryWhen, delay, mergeMap } from 'rxjs/operators';

const observable = throwError('An error occurred');

observable.pipe(
  retryWhen(errors =>
    errors.pipe(
      mergeMap((error, index) => {
        if (index < 3) {
          console.log(`Retrying... attempt #${index + 1}`);
          return of(error).pipe(delay(1000));  // Retry after 1 second
        }
        return throwError('Retries failed');
      })
    )
  )
).subscribe({
  next: value => console.log(value),
  error: err => console.log('Error:', err)
});

আউটপুট:

Retrying... attempt #1
Retrying... attempt #2
Error: Retries failed

ব্যাখ্যা:

  • এখানে, retryWhen() ব্যবহার করে ত্রুটির পর ৩ বার পুনরায় চেষ্টা করা হয়েছে এবং প্রতি পুনরায় চেষ্টা করার পর ১ সেকেন্ড বিলম্ব দেওয়া হয়েছে। পুনরায় চেষ্টা ৩ বার ব্যর্থ হলে, অবশেষে ত্রুটির বার্তা পাঠানো হয়েছে।

5. Custom Operators

RxJS এর আরেকটি শক্তিশালী বৈশিষ্ট্য হল custom operators তৈরি করার ক্ষমতা। আপনি নিজের অপারেটর তৈরি করতে পারেন যা আপনার প্রয়োজন অনুযায়ী স্ট্রিমের ডেটা প্রসেস করে। একটি কাস্টম অপারেটর সাধারণত একটি pipeable function হয় যা একটি Observable কে ইনপুট হিসেবে নিয়ে একটি নতুন Observable ফেরত দেয়।

উদাহরণ: Custom Operator তৈরি করা

import { Observable } from 'rxjs';

// Custom operator to multiply the emitted value by 2
function multiplyByTwo() {
  return (source: Observable<number>) => {
    return new Observable(subscriber => {
      source.subscribe({
        next(value) {
          subscriber.next(value * 2);
        },
        error(err) {
          subscriber.error(err);
        },
        complete() {
          subscriber.complete();
        }
      });
    });
  };
}

import { of } from 'rxjs';

of(1, 2, 3).pipe(
  multiplyByTwo()  // Using the custom operator
).subscribe(value => console.log(value));

আউটপুট:

2
4
6

ব্যাখ্যা:

  • এখানে একটি কাস্টম অপারেটর multiplyByTwo তৈরি করা হয়েছে, যা স্ট্রিমের প্রতিটি মানকে ২ দিয়ে গুণ করে নতুন স্ট্রিম তৈরি করছে।

6. Using combineLatest() for Multi-Stream Data

combineLatest() অপারেটরটি একাধিক স্ট্রিম থেকে সর্বশেষ মান নিয়ে একটি নতুন Observable তৈরি করে। এটি একাধিক অ্যাসিঙ্ক্রোনাস অপারেশন একত্রিত করার জন্য অত্যন্ত উপকারী।

উদাহরণ: combineLatest() ব্যবহার করা

import { interval } from 'rxjs';
import { combineLatest, map } from 'rxjs/operators';

const observable1 = interval(1000);  // Emits 0, 1, 2, 3...
const observable2 = interval(1500);  // Emits 0, 1, 2, 3...

combineLatest([observable1, observable2]).pipe(
  map(([value1, value2]) => `Combined values: ${value1}, ${value2}`)
).subscribe(console.log);

আউটপুট:

Combined values: 0, 0
Combined values: 1, 0
Combined values: 1, 1
Combined values: 2, 1
Combined values: 2, 2

ব্যাখ্যা:

  • combineLatest() অপারেটরটি দুটি Observable এর সর্বশেষ মান একত্রিত করে একটি নতুন স্ট্রিম তৈরি করেছে। এটি একাধিক স্ট্রিমের ডেটা সিঙ্ক্রোনাইজ করতে খুবই কার্যকরী।

সারাংশ

RxJS-এ Advanced Techniques যেমন Higher-order Observables, Multicasting Operators, Windowing, Retry Logic, Custom Operators, এবং combineLatest() আপনাকে আরও শক্তিশালী এবং পরিপূর্ণ অ্যাসিঙ্ক্রোনাস ডেটা প্রসেসিংয়ের ক্ষমতা প্রদান করে। এগুলি ব্যবহার করে আপনি অ্যাসিঙ্ক্রোনাস ডেটা ম্যানিপুলেশন, স্ট্রিম সমন্বয়, এবং রিয়্যাক্টিভ প্যাটার্নে উন্নত কার্যক্ষমতা অর্জন করতে পারবেন।

এই টেকনিকগুলির সাহায্যে, আপনি RxJS-এ আরও জটিল এবং উন্নত reactive programming তৈরি করতে সক্ষম হবেন, যা আপনার অ্যাপ্লিকেশনকে আরও দক্ষ এবং স্কেলেবল করে তুলবে।

Content added By

RxJS (Reactive Extensions for JavaScript) অ্যাসিঙ্ক্রোনাস প্রোগ্রামিংয়ের জন্য একটি শক্তিশালী লাইব্রেরি। Higher-order Observables হল এমন একটি ধারণা যেখানে একটি Observable ভিতরে আরেকটি Observable ধারণ করে। এটি একটি স্তরের Observable যে আরেকটি Observable কে emit করে, যার ফলে higher-order নামে পরিচিত। এই ধরনের অবজার্ভেবলগুলো সাধারণত অ্যাসিঙ্ক্রোনাস অপারেশনগুলির মধ্যে কিছু নির্দিষ্ট ক্রম বা লজিক তৈরি করার জন্য ব্যবহৃত হয়।


Higher-order Observable কী?

Higher-order Observable হল একটি Observable যা অন্য Observable কে emits করে। অর্থাৎ, এটি এমন একটি Observable যা ভেতরে আরেকটি Observable ধারণ করে। যখন আপনি সাবস্ক্রাইব করেন, তখন আপনি ওই ভেতরের Observable-এর মানগুলোর ওপর কাজ করতে পারেন।

এটি কার্যকরী যখন আপনি এমন পরিস্থিতিতে কাজ করছেন যেখানে একাধিক অ্যাসিঙ্ক্রোনাস অপারেশন একে অপরের ওপর নির্ভরশীল এবং আপনি তাদের প্রসেসিং অর্ডার বা স্ট্রিম সিঙ্ক্রোনাইজ করতে চান।

Higher-order Observable এর ব্যবহার

Higher-order Observable ব্যবহার করতে আপনি সাধারণত switchMap(), mergeMap(), concatMap(), exhaustMap() ইত্যাদি অপারেটর ব্যবহার করেন। এই অপারেটরগুলো উচ্চতর অবজার্ভেবলগুলোর মধ্যে মান আদান-প্রদান করতে এবং তাদের স্ট্রিম ম্যানেজ করতে সহায়তা করে।


Higher-order Observables এর উদাহরণ

1. switchMap() এর মাধ্যমে Higher-order Observable

switchMap() অপারেটরটি একটি উচ্চতর অবজার্ভেবলকে সাবস্ক্রাইব করতে এবং তার ভেতরের Observable কে একটি নতুন Observable এ মান রূপান্তর করতে ব্যবহৃত হয়। এটি আগের সাবস্ক্রিপশন বাতিল করে এবং নতুন সাবস্ক্রিপশনকে গ্রহণ করে।

উদাহরণ: switchMap() দিয়ে Higher-order Observable

import { of, interval } from 'rxjs';
import { switchMap, take } from 'rxjs/operators';

// প্রথম Observable: ১ সেকেন্ডে সংখ্যা পাঠাবে
const outerObservable = interval(1000).pipe(take(3));

outerObservable.pipe(
  switchMap(outerValue => {
    console.log('Outer Observable value:', outerValue);
    // ভিতরের Observable: ১ সেকেন্ডে সংখ্যা পাঠাবে
    return interval(500).pipe(take(3)); // Nested Observable
  })
).subscribe(innerValue => {
  console.log('Inner Observable value:', innerValue);
});

আউটপুট:

Outer Observable value: 0
Inner Observable value: 0
Inner Observable value: 1
Inner Observable value: 2
Outer Observable value: 1
Inner Observable value: 0
Inner Observable value: 1
Inner Observable value: 2
Outer Observable value: 2
Inner Observable value: 0
Inner Observable value: 1
Inner Observable value: 2

এখানে, switchMap() অপারেটরটি মূল outerObservable থেকে প্রতিটি মান গ্রহণ করছে এবং সেই মান অনুযায়ী একটি নতুন nested Observable (ভিতরের Observable) তৈরি করছে। আগের সাবস্ক্রিপশনটি বাতিল (cancel) হয়ে যায় এবং নতুন সাবস্ক্রিপশন শুরু হয়।


2. mergeMap() এর মাধ্যমে Higher-order Observable

mergeMap() অপারেটরটি একই সময়ে একাধিক Observable এর সাবস্ক্রিপশন চালু করতে সহায়তা করে। এটি একাধিক স্ট্রিমকে একসাথে চালায় এবং তাদের মানগুলোর ওপর কাজ করে।

উদাহরণ: mergeMap() দিয়ে Higher-order Observable

import { of, interval } from 'rxjs';
import { mergeMap, take } from 'rxjs/operators';

const outerObservable = of('First', 'Second', 'Third');

outerObservable.pipe(
  mergeMap(outerValue => {
    console.log('Outer value:', outerValue);
    // Nested Observable: ১ সেকেন্ডে সংখ্যা পাঠাবে
    return interval(500).pipe(take(3)); // Nested Observable
  })
).subscribe(innerValue => {
  console.log('Inner value:', innerValue);
});

আউটপুট:

Outer value: First
Inner value: 0
Inner value: 1
Inner value: 2
Outer value: Second
Inner value: 0
Inner value: 1
Inner value: 2
Outer value: Third
Inner value: 0
Inner value: 1
Inner value: 2

এখানে, mergeMap() অপারেটরটি প্রতিটি মানের জন্য একটি নতুন Observable তৈরি করেছে এবং সেই সব Observable গুলোকে একই সময়ে সাবস্ক্রাইব করে। এটি concurrent সাবস্ক্রিপশন চালানোর জন্য ব্যবহৃত হয়।


3. concatMap() এর মাধ্যমে Higher-order Observable

concatMap() অপারেটরটি একটি পর্যায়ক্রমিক higher-order observable এর জন্য ব্যবহৃত হয়। এটি সাবস্ক্রিপশনগুলোকে সিকোয়েন্সিয়ালি (sequentially) চালায়। অর্থাৎ, একটি সাবস্ক্রিপশন শেষ হলে পরবর্তীটি শুরু হয়।

উদাহরণ: concatMap() দিয়ে Higher-order Observable

import { of, interval } from 'rxjs';
import { concatMap, take } from 'rxjs/operators';

const outerObservable = of('First', 'Second', 'Third');

outerObservable.pipe(
  concatMap(outerValue => {
    console.log('Outer value:', outerValue);
    // Nested Observable: ১ সেকেন্ডে সংখ্যা পাঠাবে
    return interval(500).pipe(take(3)); // Nested Observable
  })
).subscribe(innerValue => {
  console.log('Inner value:', innerValue);
});

আউটপুট:

Outer value: First
Inner value: 0
Inner value: 1
Inner value: 2
Outer value: Second
Inner value: 0
Inner value: 1
Inner value: 2
Outer value: Third
Inner value: 0
Inner value: 1
Inner value: 2

এখানে, concatMap() অপারেটরটি প্রতিটি outerObservable এর মানের জন্য একটি নতুন nestedObservable তৈরি করছে এবং সেগুলোকে একের পর এক (sequentially) সাবস্ক্রাইব করছে।


4. exhaustMap() এর মাধ্যমে Higher-order Observable

exhaustMap() অপারেটরটি তখন ব্যবহৃত হয় যখন আপনি চান যে কোনো নতুন observable সাবস্ক্রাইব করার আগে পূর্ববর্তী সাবস্ক্রিপশনটি শেষ হয়ে যাক। এটি নতুন সাবস্ক্রিপশন গ্রহণ করার আগে আগের সাবস্ক্রিপশনটি শেষ হওয়ার জন্য অপেক্ষা করে।

উদাহরণ: exhaustMap() দিয়ে Higher-order Observable

import { of, interval } from 'rxjs';
import { exhaustMap, take } from 'rxjs/operators';

const outerObservable = of('First', 'Second', 'Third');

outerObservable.pipe(
  exhaustMap(outerValue => {
    console.log('Outer value:', outerValue);
    // Nested Observable: ১ সেকেন্ডে সংখ্যা পাঠাবে
    return interval(500).pipe(take(3)); // Nested Observable
  })
).subscribe(innerValue => {
  console.log('Inner value:', innerValue);
});

আউটপুট:

Outer value: First
Inner value: 0
Inner value: 1
Inner value: 2
Outer value: Second
Inner value: 0
Inner value: 1
Inner value: 2
Outer value: Third
Inner value: 0
Inner value: 1
Inner value: 2

এখানে, exhaustMap() অপারেটরটি শুধুমাত্র প্রথম observable সাবস্ক্রাইব করছে এবং পরবর্তী observable গুলোর সাবস্ক্রিপশন গ্রহণ করবে না যতক্ষণ না আগেরটি শেষ হয়।


Higher-order Observables এর সুবিধা

  1. Multiple Layers of Asynchronous Data: Higher-order Observables ব্যবহারের মাধ্যমে আপনি অ্যাসিঙ্ক্রোনাস ডেটা প্রক্রিয়া করতে পারেন যেখানে একাধিক ডেটা স্ট্রিম একে অপরের ওপর নির্ভরশীল হতে পারে।
  2. Nested Operations: আপনি একাধিক nested operation এক্সিকিউট করতে পারেন, যেমন একটি HTTP রিকোয়েস্টের ফলস্বরূপ আরেকটি HTTP রিকোয়েস্ট বা ইভেন্ট।
  3. Better Control of Subscription Flow: Higher-order Observables আপনাকে সাবস্ক্রিপশনের ফ্লো (সিকোয়েন্সিয়ালি বা concurently) নিয়ন্ত্রণ করতে সহায়তা করে।

সারাংশ

Higher-order Observables হল RxJS এর একটি গুরুত্বপূর্ণ ধারণা যেখানে একটি Observable অন্য একটি Observable কে emits করে। এই ধরনের স্ট্রিমগুলির জন্য আপনি switchMap(), mergeMap(), concatMap(), exhaustMap() অপারেটর ব্যবহার করতে পারেন। এগুলি অ্যাসিঙ্ক্রোনাস স্ট্রিমের মধ্যে উচ্চতর স্তরের ডেটা প্রক্রিয়া করার জন্য ব্যবহৃত হয়। Higher-order Observables ব্যবহারের মাধ্যমে, আপনি বিভিন্ন অ্যাসিঙ্ক্রোনাস অপারেশনগুলির মধ্যে সঠিক সিঙ্ক্রোনাইজেশন এবং প্রতিক্রিয়া নিশ্চিত করতে পারেন।

Content added By

RxJS (Reactive Extensions for JavaScript) একটি শক্তিশালী লাইব্রেরি যা অ্যাসিঙ্ক্রোনাস ডেটা স্ট্রিম এবং ইভেন্ট-ভিত্তিক প্রোগ্রামিং সহজ এবং কার্যকরী করে তোলে। ডাইনামিক ডেটা স্ট্রিম (যেমন, রিয়েল-টাইম ডেটা, ইউজার ইনপুট, সার্ভার থেকে আসা ডেটা) এর প্রক্রিয়া ও নিয়ন্ত্রণে অনেক সময় বিশেষ অপারেটরের প্রয়োজন হয়, যাতে স্ট্রিমের উপরে আরও উন্নত এবং কার্যকরী কাজ করা যায়।

এখানে কিছু Advanced Operators সম্পর্কে আলোচনা করা হবে, যা ডাইনামিক ডেটা স্ট্রিম ম্যানিপুলেশন এবং কার্যকরী ফিল্টারিং ও ট্রান্সফরমেশন করার জন্য ব্যবহৃত হয়।


1. switchMap()

switchMap() অপারেটরটি একটি নতুন observable তৈরি করে এবং পুরনো observable কে বাতিল (unsubscribe) করে। এটি সাধারণত ব্যবহার করা হয় যখন আপনি নতুন মান প্রাপ্তির জন্য পুরনো স্ট্রিমের পরিবর্তে নতুন স্ট্রিম চান।

বৈশিষ্ট্য:

  • এটি নতুন Observable এ সুইচ করে, পুরনো Observable কে বাতিল করে দেয়।
  • এটি সাধারণত API কলের মত ফিচারগুলির জন্য ব্যবহৃত হয়, যেখানে প্রতিটি নতুন কল পূর্ববর্তী কলকে বাতিল করতে হবে।

উদাহরণ:

import { of } from 'rxjs';
import { switchMap } from 'rxjs/operators';

const observable = of('a', 'b', 'c').pipe(
  switchMap(value => {
    return of(value.toUpperCase());
  })
);

observable.subscribe(result => console.log(result));

আউটপুট:

A
B
C

এখানে, switchMap() অপারেটরটি নতুন Observable তৈরি করছে এবং প্রতিটি মান প্রাপ্তির জন্য পুরনো মান বাতিল করে।


2. concatMap()

concatMap() অপারেটরটি আসলে map() অপারেটরের মতোই কাজ করে, তবে এটি Observable গুলিকে একত্রে যুক্ত করে এবং স্ট্রিমের উপর sequentially (সিরিয়ালি) অপারেশন চালায়। এটি খুবই উপকারী যখন আপনি চাইছেন একের পর এক কার্যক্রম সম্পন্ন হোক, এবং একটি স্ট্রিম সম্পন্ন না হওয়া পর্যন্ত অন্য স্ট্রিম শুরু না হয়।

বৈশিষ্ট্য:

  • স্ট্রিমগুলোকে সিকোয়েন্সিয়ালি (serially) এক্সিকিউট করতে সহায়তা করে।
  • পরবর্তী স্ট্রিম এক্সিকিউট হওয়ার আগে পূর্ববর্তী স্ট্রিমের সম্পন্ন হওয়া দরকার।

উদাহরণ:

import { of } from 'rxjs';
import { concatMap } from 'rxjs/operators';

const observable = of(1, 2, 3).pipe(
  concatMap(value => of(value * 2))
);

observable.subscribe(result => console.log(result));

আউটপুট:

2
4
6

এখানে, concatMap() অপারেটরটি একের পর এক স্ট্রিমের মান নিয়ে কাজ করছে এবং সিকোয়েন্সিয়ালি তাদের প্রসেস করছে।


3. mergeMap()

mergeMap() অপারেটরটি একাধিক স্ট্রিমকে একযোগে প্রসেস করার জন্য ব্যবহৃত হয়। এটি বিভিন্ন Observable এর মানগুলোকে একত্রে সাবস্ক্রাইব করে এবং একযোগে (concurrently) ফলাফল পাঠায়।

বৈশিষ্ট্য:

  • একাধিক Observable কে একযোগে (concurrently) প্রসেস করে।
  • যখন আপনি অনেক স্ট্রিম একসাথে প্রসেস করতে চান, এটি উপকারী।

উদাহরণ:

import { of } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

const observable = of(1, 2, 3).pipe(
  mergeMap(value => of(value * 10))
);

observable.subscribe(result => console.log(result));

আউটপুট:

10
20
30

এখানে, mergeMap() অপারেটরটি প্রতিটি মানকে একযোগে প্রসেস করছে, এবং একই সময়ে একাধিক স্ট্রিমে কাজ চলছে।


4. debounceTime()

debounceTime() অপারেটরটি মূলত UI-ভিত্তিক অ্যাপ্লিকেশনে ব্যবহার হয় যেখানে একটি ইউজার দ্রুত ইনপুট বা ইভেন্ট তৈরি করে, এবং আপনি শুধু শেষ মান গ্রহণ করতে চান, যাতে অতিরিক্ত বা অপ্রয়োজনীয় রিকোয়েস্টগুলি এড়ানো যায়। এটি আসলে টাইমিং ভিত্তিক debouncing করতে ব্যবহৃত হয়।

বৈশিষ্ট্য:

  • একটি নির্দিষ্ট সময়ের মধ্যে নতুন ইনপুট বা পরিবর্তনগুলো মাপা হয় এবং শেষ মানটি রাখা হয়।
  • এটি ইভেন্টের চাপ কমাতে এবং শুধুমাত্র প্রয়োজনীয় মান গ্রহণ করতে ব্যবহৃত হয়।

উদাহরণ:

import { fromEvent } from 'rxjs';
import { debounceTime, map } from 'rxjs/operators';

const input = document.getElementById('search');

fromEvent(input, 'input').pipe(
  debounceTime(500), // 500 মিলিসেকেন্ড পর ইনপুট নেবে
  map(event => event.target.value)
).subscribe(value => console.log(value));

এখানে, debounceTime(500) ব্যবহার করা হয়েছে, যাতে ইনপুটের জন্য ৫০০ মিলিসেকেন্ড বিলম্ব করা হয় এবং ইউজারের ইনপুট কমিয়ে আনা হয়।


5. distinctUntilChanged()

distinctUntilChanged() অপারেটরটি আগের মানের সাথে মিলিয়ে দেখার মাধ্যমে ডুপ্লিকেট মান ফিল্টার করে। এটি এমন অবস্থায় কার্যকর যেখানে আপনি চান না যে একাধিকবার একই মান স্ট্রিমে আসুক।

বৈশিষ্ট্য:

  • পূর্ববর্তী মানের সঙ্গে তুলনা করে শুধুমাত্র distinct মান গ্রহণ করা হয়।
  • এটি স্ট্রিমে ডুপ্লিকেট মান ফিল্টার করে।

উদাহরণ:

import { of } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';

const observable = of(1, 1, 2, 2, 3, 3, 4);

observable.pipe(
  distinctUntilChanged()
).subscribe(value => console.log(value));

আউটপুট:

1
2
3
4

এখানে, distinctUntilChanged() অপারেটরটি পরপর একই মানগুলো ফিল্টার করে ফেলেছে এবং শুধুমাত্র ভিন্ন মানগুলো সাবস্ক্রাইবারে পাঠিয়েছে।


6. combineLatest()

combineLatest() অপারেটরটি একাধিক Observable থেকে সর্বশেষ মানগুলো একত্রিত করে একটি নতুন Observable তৈরি করে। এটি একাধিক স্ট্রিমের সর্বশেষ মান নিয়ে একটি নতুন স্ট্রিম তৈরি করে এবং সাবস্ক্রাইবারে পাঠায়।

বৈশিষ্ট্য:

  • একাধিক Observable এর সর্বশেষ মান একত্রিত করে একটি নতুন Observable তৈরি করে।
  • এটি সবগুলো Observable এর সর্বশেষ মান গুলি সাবস্ক্রাইবে পাঠাবে।

উদাহরণ:

import { combineLatest, of } from 'rxjs';

const observable1 = of(1, 2, 3);
const observable2 = of('a', 'b', 'c');

combineLatest([observable1, observable2]).subscribe(([val1, val2]) => {
  console.log(`Combined values: ${val1}, ${val2}`);
});

আউটপুট:

Combined values: 3, c

এখানে, combineLatest() দুটি Observable এর সর্বশেষ মান একত্রিত করে পাঠাচ্ছে।


7. bufferTime()

bufferTime() অপারেটরটি একটি টাইমার ভিত্তিক buffering কৌশল প্রয়োগ করে, যেখানে আপনি নির্দিষ্ট সময়সীমার মধ্যে আসা ইভেন্টগুলিকে গ্রুপ করে একটি অ্যারে তৈরি করেন। এটি সাধারনত যখন অনেকগুলো ইভেন্ট একসাথে আসছে, তখন ব্যবহৃত হয়।

বৈশিষ্ট্য:

  • নির্দিষ্ট সময়সীমার মধ্যে ইভেন্টগুলোকে গ্রুপ করে একটি অ্যারে তৈরি করে।

উদাহরণ:

import { fromEvent } from 'rxjs';
import { bufferTime } from 'rxjs/operators';

const clickStream = fromEvent(document, 'click');

clickStream.pipe(
  bufferTime(2000)  // Collect clicks in batches of 2 seconds
).subscribe(events => {
  console.log('Collected events:', events);
});

এখানে, bufferTime(2000) ব্যবহার করে, ক্লিক ইভেন্টগুলোকে ২ সেকেন্ডের জন্য গ্রুপ করা হচ্ছে।


সারাংশ

RxJS এর Advanced Operators ডাইনামিক ডেটা স্ট্রিমগুলোর প্রক্রিয়াকরণ এবং ম্যানিপুলেশন করতে অত্যন্ত কার্যকরী। এর মাধ্যমে আপনি ডেটার ফিল্টারিং, ট্রান্সফরমেশন, এবং একাধিক স্ট্রিমের মধ্যে সম্পর্ক স্থাপন করতে পারবেন।

  • switchMap(): একটি নতুন স্ট্রিমে সুইচ করে পুরনো স্ট্রিমকে বাতিল করে।
  • concatMap(): স্ট্রিমগুলোকে সিকোয়েন্সিয়ালি প্রসেস করে।
  • mergeMap(): একাধিক স্ট্রিমকে একযোগে প্রসেস করে।
  • debounceTime(): অতিরিক্ত ইভেন্ট থেকে পারফরম্যান্স অপটিমাইজ করে।
  • distinctUntilChanged(): ডুপ্লিকেট মান ফিল্টার করে।
  • combineLatest(): একাধিক স্ট্রিমের সর্বশেষ মান একত্রিত করে।
  • bufferTime(): নির্দিষ্ট সময়ে ইভেন্টগুলোকে গ্রুপ করে।

এই অপারেটরগুলো ব্যবহার করে আপনি RxJS এর সাহায্যে আরো দক্ষ এবং কার্যকরী ডেটা স্ট্রিম ম্যানেজমেন্ট করতে পারেন।

Content added By

RxJS (Reactive Extensions for JavaScript) একটি শক্তিশালী লাইব্রেরি যা অ্যাসিঙ্ক্রোনাস ডেটা স্ট্রিম এবং ইভেন্ট-ভিত্তিক প্রোগ্রামিং সহজভাবে পরিচালনা করতে সাহায্য করে। যখন আপনি complex data manipulation (যেমন, ডেটার ফিল্টারিং, ট্রান্সফর্মেশন, মার্জিং ইত্যাদি) করতে চান, RxJS আপনাকে একাধিক শক্তিশালী অপারেটর প্রদান করে যা ডেটা স্ট্রিমের সাথে জটিল কাজ সম্পাদন করতে সক্ষম।

এখানে, আমরা কিছু Advanced Techniques নিয়ে আলোচনা করব যা RxJS ব্যবহার করে complex data manipulation করতে সাহায্য করবে। এই টেকনিকগুলি stream transformation, combination, filtering এবং error handling সহ বিভিন্ন চ্যালেঞ্জের সমাধান দিতে পারে।


1. Combining Multiple Observables with combineLatest()

যখন একাধিক Observable-এর মান একসাথে প্রয়োজন হয়, তখন combineLatest() অপারেটরটি ব্যবহার করা হয়। এটি একাধিক Observable-এর সর্বশেষ মানগুলো একত্রিত করে একটি নতুন Observable তৈরি করে।

উদাহরণ:

import { combineLatest, of } from 'rxjs';

const obs1$ = of(1, 2, 3);
const obs2$ = of('A', 'B', 'C');

combineLatest([obs1$, obs2$]).subscribe(([val1, val2]) => {
  console.log(`Value 1: ${val1}, Value 2: ${val2}`);
});

আউটপুট:

Value 1: 3, Value 2: C

এখানে, combineLatest() দুইটি Observable-এর সর্বশেষ মান একত্রিত করেছে এবং সেই মানটি সাবস্ক্রাইবারে পাঠিয়েছে।


2. Advanced Filtering with filter() and takeWhile()

filter() অপারেটরটি Observable থেকে ডেটা ফিল্টার করতে ব্যবহৃত হয়। আপনি যদি কিছু নির্দিষ্ট শর্তে ডেটা ফিল্টার করতে চান, তবে এটি অত্যন্ত কার্যকর। আরেকটি অপারেটর takeWhile() আপনাকে স্ট্রিমে ফিল্টারিং করতে সহায়তা করে যতক্ষণ না একটি শর্ত পূর্ণ হয়।

উদাহরণ: filter() অপারেটর ব্যবহার করে

import { of } from 'rxjs';
import { filter } from 'rxjs/operators';

const numbers$ = of(1, 2, 3, 4, 5);

numbers$.pipe(
  filter(num => num % 2 === 0) // Even numbers only
).subscribe(console.log);

আউটপুট:

2
4

এখানে, filter() অপারেটরটি শুধুমাত্র even সংখ্যাগুলো ফিল্টার করে প্রদর্শন করেছে।

উদাহরণ: takeWhile() অপারেটর ব্যবহার করে

import { of } from 'rxjs';
import { takeWhile } from 'rxjs/operators';

const numbers$ = of(1, 2, 3, 4, 5);

numbers$.pipe(
  takeWhile(num => num < 4) // Take values while they are less than 4
).subscribe(console.log);

আউটপুট:

1
2
3

এখানে, takeWhile() অপারেটরটি num < 4 শর্ত পূর্ণ না হওয়া পর্যন্ত মান গ্রহণ করেছে।


3. Transforming Data with map(), mergeMap() and concatMap()

RxJS-এ ডেটার ট্রান্সফরমেশন একটি সাধারণ কাজ। map(), mergeMap(), এবং concatMap() অপারেটরগুলি বিভিন্ন ধরনের ডেটার ট্রান্সফরমেশন এবং ম্যানিপুলেশন করার জন্য ব্যবহৃত হয়।

map() উদাহরণ:

import { of } from 'rxjs';
import { map } from 'rxjs/operators';

const numbers$ = of(1, 2, 3, 4, 5);

numbers$.pipe(
  map(num => num * 2) // Double the value
).subscribe(console.log);

আউটপুট:

2
4
6
8
10

এখানে, map() অপারেটরটি প্রতিটি মানকে ২ গুণ করে দিয়েছে।

mergeMap() এবং concatMap() উদাহরণ:

import { of } from 'rxjs';
import { mergeMap, concatMap } from 'rxjs/operators';

// mergeMap Example
const source$ = of(1, 2, 3);

source$.pipe(
  mergeMap(val => of(val * 2)) // Concurrent mapping
).subscribe(console.log);

// concatMap Example
source$.pipe(
  concatMap(val => of(val * 2)) // Sequential mapping
).subscribe(console.log);

mergeMap() সব মানকে একত্রে প্রসেস করে এবং concatMap() একে একে মান প্রসেস করে।


4. Handling Errors with catchError() and retryWhen()

Error handling RxJS-এ একটি গুরুত্বপূর্ণ দিক। যখন কোনো Observable ত্রুটি (error) দেয়, তখন catchError() অপারেটরটি সেই error হ্যান্ডল করতে ব্যবহৃত হয়। retryWhen() ত্রুটি হওয়ার পরে পুনরায় চেষ্টা করতে সাহায্য করে।

উদাহরণ: catchError() দিয়ে Error Handling

import { of } from 'rxjs';
import { catchError } from 'rxjs/operators';

const source$ = of(1, 2, 3, 4, 5);

source$.pipe(
  catchError(error => {
    console.error('Error occurred:', error);
    return of('Fallback Value');
  })
).subscribe(console.log);

এখানে, যদি কোনো ত্রুটি ঘটে, তবে catchError() ফালব্যাক ভ্যালু প্রদান করবে।

উদাহরণ: retryWhen() দিয়ে Retry Logic

import { throwError } from 'rxjs';
import { retryWhen, delay, take } from 'rxjs/operators';

const source$ = throwError('Temporary Error');

source$.pipe(
  retryWhen(errors => 
    errors.pipe(
      delay(1000), // Retry after 1 second
      take(3) // Retry up to 3 times
    )
  )
).subscribe({
  next: value => console.log(value),
  error: err => console.error('Final Error:', err)
});

এখানে, retryWhen() ত্রুটির পরে ৩ বার পুনরায় চেষ্টা করবে এবং এক সেকেন্ড পর পর retry করবে।


5. Throttling and Debouncing with debounceTime() and throttleTime()

Debouncing এবং Throttling techniques ব্যবহার করে আপনি কিছু ইভেন্ট বা কার্যকলাপের ফ্রিকোয়েন্সি সীমাবদ্ধ করতে পারেন। debounceTime() ইভেন্টের মধ্যে বিলম্ব প্রদান করে এবং throttleTime() নির্দিষ্ট সময় পর পর ইভেন্ট চালায়।

debounceTime() উদাহরণ:

import { fromEvent } from 'rxjs';
import { debounceTime } from 'rxjs/operators';

// Input field event
const input = document.getElementById('search');

fromEvent(input, 'input').pipe(
  debounceTime(500) // Wait for 500ms after last keystroke
).subscribe(event => {
  console.log('Search:', event.target.value);
});

এখানে, debounceTime() ব্যবহার করে, ইউজার ইন্টারঅ্যাকশনের পরে 500ms বিলম্বিতভাবে সাইড এফেক্ট ট্রিগার করা হয়েছে।

throttleTime() উদাহরণ:

import { fromEvent } from 'rxjs';
import { throttleTime } from 'rxjs/operators';

const input = document.getElementById('scroll');

fromEvent(input, 'scroll').pipe(
  throttleTime(2000) // Limit scroll event to once every 2 seconds
).subscribe(() => {
  console.log('Scrolled');
});

এখানে, throttleTime() ব্যবহার করে, আপনি ইভেন্টের ফ্রিকোয়েন্সি কমিয়ে দিতে পারেন, যাতে এটি প্রতি 2 সেকেন্ডে একবার ট্রিগার হয়।


সারাংশ

RxJS এর advanced techniques complex data manipulation এর জন্য অনেক শক্তিশালী সরঞ্জাম সরবরাহ করে। এই অপারেটরগুলি stream transformation, combination, error handling, এবং throttling/debouncing সহ বিভিন্ন ধরনের ডেটা প্রসেসিংয়ের জন্য ব্যবহৃত হয়।

  1. combineLatest(): একাধিক Observable এর সর্বশেষ মান একত্রিত করে।
  2. map(), mergeMap(), concatMap(): ডেটা ট্রান্সফর্মেশন এবং ম্যানিপুলেশন।
  3. catchError(), retryWhen(): Error handling এবং retry লজিক।
  4. debounceTime(), throttleTime(): Throttling এবং Debouncing techniques.

এই সব অপারেটর RxJS-এর শক্তিশালী ক্ষমতাগুলোর মধ্যে অন্তর্ভুক্ত, যা আপনার অ্যাসিঙ্ক্রোনাস এবং রিয়্যাক্টিভ ডেটা স্ট্রিমগুলি কার্যকরভাবে পরিচালনা করতে সহায়তা করে।

Content added By

RxJS (Reactive Extensions for JavaScript) একটি শক্তিশালী লাইব্রেরি যা বিভিন্ন ধরনের operators সরবরাহ করে, যা Observables থেকে ডেটা প্রক্রিয়াকরণের জন্য ব্যবহৃত হয়। তবে, কখনও কখনও আপনি এমন কিছু কার্যকরী অপারেটর তৈরি করতে চাইবেন যা RxJS এর স্ট্যান্ডার্ড অপারেটরগুলির মধ্যে অন্তর্ভুক্ত না থাকে। এই ক্ষেত্রে, Custom Operators তৈরি করা একটি দরকারী পদ্ধতি।

RxJS-এ Custom Operators তৈরি করার মাধ্যমে আপনি নিজের প্রয়োজন অনুযায়ী স্ট্রিমে কার্যকলাপ যোগ করতে পারেন এবং এভাবে ডেটা প্রসেসিংয়ের প্রক্রিয়াকে আরও নির্দিষ্ট ও নিয়ন্ত্রণযোগ্য করতে পারবেন।


Custom Operators কী?

Custom Operators হল আপনার নিজের কাস্টম লজিকের ভিত্তিতে তৈরি অপারেটর। এটি RxJS স্ট্রিমে ডেটা ট্রান্সফরম বা ফিল্টার করার জন্য ব্যবহার করা যেতে পারে। Custom operators তৈরি করতে, আপনাকে higher-order function ব্যবহার করতে হবে, যা মূল Observable-এর উপর কাজ করবে এবং Observable রিটার্ন করবে।

Custom Operator তৈরি করার জন্য সাধারণভাবে নিম্নলিখিত ধাপগুলো অনুসরণ করা হয়:

  1. একটি ফাংশন তৈরি করা যা RxJS অপারেটরের মতো কাজ করবে।
  2. ফাংশনটি একটি Observable গ্রহণ করবে এবং তার সাথে একটি transformation বা side-effect পরিচালনা করবে।
  3. নতুন Observable রিটার্ন করা।

1. Custom Operator তৈরি করা

Custom operators তৈরি করতে, প্রথমে একটি ফাংশন তৈরি করতে হবে যা একটি Observable গ্রহন করবে এবং তার সাথে কাজ করবে। আমরা rxjs এর Observable.create() বা new Observable() ব্যবহার করতে পারি।

উদাহরণ: Custom double Operator তৈরি করা

ধরা যাক, আমরা একটি কাস্টম অপারেটর তৈরি করতে চাই, যা প্রতিটি সংখ্যাকে দুই গুণ করবে।

import { Observable } from 'rxjs';

// Custom operator: double
function double() {
  return (source) => new Observable(observer => {
    // subscribe to the source observable
    return source.subscribe({
      next(value) {
        // double the value
        observer.next(value * 2);
      },
      error(err) {
        observer.error(err);
      },
      complete() {
        observer.complete();
      }
    });
  });
}

এখানে, double() নামের একটি custom operator তৈরি করা হয়েছে যা একটি Observable স্ট্রিম গ্রহণ করবে এবং প্রতিটি মানকে দুই গুণ করবে।


2. Custom Operator ব্যবহার করা

এখন, এই কাস্টম অপারেটরটি ব্যবহার করে আমরা একটি Observable তৈরি করে তা সাবস্ক্রাইব করতে পারি।

import { of } from 'rxjs';

const numbers$ = of(1, 2, 3, 4, 5);

numbers$.pipe(
  double()  // Apply the custom operator
).subscribe(value => {
  console.log(value);  // Output: 2, 4, 6, 8, 10
});

এখানে, double() কাস্টম অপারেটরটি প্রতিটি মানকে দুই গুণ করেছে। আউটপুট হবে:

2
4
6
8
10

3. আরও উদাহরণ: Custom delayBy Operator

ধরা যাক, আমরা একটি কাস্টম অপারেটর তৈরি করতে চাই যা নির্দিষ্ট সময়ে প্রতিটি মান বিলম্বিত করবে।

import { Observable } from 'rxjs';

// Custom operator: delayBy
function delayBy(time: number) {
  return (source) => new Observable(observer => {
    return source.subscribe({
      next(value) {
        setTimeout(() => {
          observer.next(value);
        }, time);
      },
      error(err) {
        observer.error(err);
      },
      complete() {
        observer.complete();
      }
    });
  });
}

এখানে delayBy() অপারেটরটি একটি নির্দিষ্ট সময়ের জন্য প্রতিটি মান বিলম্বিত করবে। time প্যারামিটারটি বিলম্বের সময় নির্ধারণ করে।

উদাহরণ: delayBy অপারেটর ব্যবহার করা

import { of } from 'rxjs';

const numbers$ = of(1, 2, 3, 4);

numbers$.pipe(
  delayBy(1000)  // Apply the custom delayBy operator with 1 second delay
).subscribe(value => {
  console.log(value);  // Output: 1, 2, 3, 4 (with 1 second delay)
});

এখানে, delayBy(1000) ব্যবহার করে প্রতিটি মান ১ সেকেন্ড বিলম্বিত হয়ে সাবস্ক্রাইবারে পাঠানো হচ্ছে।


4. Custom Operator তৈরির সুবিধা

  • Code Reusability: একবার কাস্টম অপারেটর তৈরি করে আপনি তা বারবার ব্যবহার করতে পারেন, ফলে কোড পুনরাবৃত্তি কমে যায়।
  • Readability: Custom অপারেটরগুলোর মাধ্যমে আপনার কোডের পাঠযোগ্যতা বৃদ্ধি পায়, কারণ সেগুলি শুধুমাত্র একটি নির্দিষ্ট কাজ সম্পন্ন করে।
  • Separation of Concerns: যখন আপনার অ্যাসিঙ্ক্রোনাস বা রিয়্যাক্টিভ কোডে একটি নির্দিষ্ট লজিক প্রয়োগ করতে হয়, তখন কাস্টম অপারেটর তৈরি করা কোডের দায়িত্বগুলি স্পষ্ট করে।

5. একটি বাস্তব উদাহরণ: Custom retryWhen অপারেটর

ধরা যাক, আমরা একটি কাস্টম retryWhen অপারেটর তৈরি করতে চাই, যা নির্দিষ্ট সময়ের মধ্যে ইরর (error) ঘটলে পুনরায় চেষ্টা করবে।

import { Observable } from 'rxjs';

// Custom retryWhen operator
function customRetryWhen(retryCount: number, delay: number) {
  return (source) => new Observable(observer => {
    let attempts = 0;

    return source.subscribe({
      next(value) {
        observer.next(value);
      },
      error(err) {
        if (attempts < retryCount) {
          attempts++;
          console.log(`Retrying... attempt #${attempts}`);
          setTimeout(() => source.subscribe(observer), delay); // Retry after delay
        } else {
          observer.error(err);  // If retry limit exceeded, throw the error
        }
      },
      complete() {
        observer.complete();
      }
    });
  });
}

এখানে customRetryWhen() অপারেটরটি ত্রুটি ঘটলে নির্দিষ্ট সংখ্যক বার পুনরায় চেষ্টা করবে এবং প্রতিবার retry করার আগে কিছু সময় বিলম্ব করবে।

উদাহরণ: Custom retryWhen অপারেটর ব্যবহার

import { throwError, of } from 'rxjs';

const observable$ = throwError('Something went wrong');

observable$.pipe(
  customRetryWhen(3, 1000)  // Retry 3 times with 1 second delay
).subscribe({
  next: (value) => console.log(value),
  error: (err) => console.log('Final Error:', err)
});

এখানে, customRetryWhen(3, 1000) ৩ বার পুনরায় চেষ্টা করার জন্য কাস্টম লজিক প্রয়োগ করেছে, এবং প্রত্যেকবার ১ সেকেন্ডের বিলম্ব দিয়েছে।


সারাংশ

RxJS তে Custom Operators তৈরি করার মাধ্যমে আপনি অ্যাসিঙ্ক্রোনাস স্ট্রিমে আপনার প্রয়োজনীয় কার্যকলাপ সহজে যোগ করতে পারেন। আপনি যেকোনো ধরনের ট্রান্সফরমেশন, ফিল্টারিং, বা side-effects তৈরি করতে পারেন এবং সেগুলো স্ট্রিমের মধ্যে প্রয়োগ করতে পারেন।

কাস্টম অপারেটরের কিছু প্রধান সুবিধা:

  • Code Reusability: একবার তৈরি করা অপারেটর পুনরায় ব্যবহার করা যায়।
  • Readability: কোড আরও সহজ এবং পরিষ্কার হয়।
  • Flexibility: আপনার প্রয়োজন অনুযায়ী স্ট্রিমে যেকোনো লজিক প্রয়োগ করতে পারবেন।

এটি RxJS-এর শক্তিশালী বৈশিষ্ট্যগুলির একটি, যা আপনার অ্যাসিঙ্ক্রোনাস ডেটা প্রক্রিয়াকরণকে আরও নিয়ন্ত্রণযোগ্য এবং কার্যকরী করে তোলে।

Content added By
Promotion

Are you sure to start over?

Loading...