Parallel Streams এবং Java 8 Concurrency

Java Technologies - জাভা কনকারেন্সি (Java Concurrency)
81
81

Java 8 থেকে Parallel Streams এবং অন্যান্য নতুন কনকারেন্সি ফিচার যোগ করা হয়েছে, যা ডেটা প্রসেসিং এবং মাল্টি-কোর প্রসেসরের সর্বোত্তম ব্যবহার নিশ্চিত করে।


Parallel Streams

Parallel Streams হলো Java Streams API এর একটি বৈশিষ্ট্য, যা একই স্ট্রিম ডেটা মাল্টিপল থ্রেডে প্রসেস করতে দেয়। এটি ডেটা প্রক্রিয়াকরণে স্বয়ংক্রিয়ভাবে প্যারালালিজম ব্যবহার করে।


Parallel Streams ব্যবহার করার মূল বৈশিষ্ট্য

  1. সারল্য: সাধারণ স্ট্রিমকে সহজেই parallel() মেথড দিয়ে প্যারালাল করা যায়।
  2. বহু-কোর প্রসেসর ব্যবহার: মাল্টি-কোর প্রসেসরের শক্তি ব্যবহার করে বড় ডেটাসেট দ্রুত প্রসেস করা।
  3. ForkJoinPool ব্যবহার: Parallel Streams অভ্যন্তরীণভাবে ForkJoinPool ব্যবহার করে প্যারালাল প্রসেসিং পরিচালনা করে।

Parallel Streams উদাহরণ

import java.util.Arrays;
import java.util.List;

public class ParallelStreamExample {
    public static void main(String[] args) {
        List<String> names = Arrays.asList("John", "Jane", "Jack", "Jill");

        // Sequential Stream
        System.out.println("Sequential Stream:");
        names.stream().forEach(name -> {
            System.out.println(Thread.currentThread().getName() + " - " + name);
        });

        // Parallel Stream
        System.out.println("\nParallel Stream:");
        names.parallelStream().forEach(name -> {
            System.out.println(Thread.currentThread().getName() + " - " + name);
        });
    }
}

আউটপুট:
Sequential Stream সব ডেটা মেইন থ্রেডে প্রসেস করে, আর Parallel Stream মাল্টিপল থ্রেডে প্রসেস করে।


Java 8 Concurrency ফিচার

Java 8 এ কনকারেন্সি ব্যবস্থার উন্নতির জন্য বেশ কিছু নতুন ফিচার এবং API অন্তর্ভুক্ত করা হয়েছে। প্রধানগুলো হলো:


১. CompletableFuture

CompletableFuture হলো Java 8 এ অ্যাসিনক্রোনাস টাস্ক পরিচালনার জন্য একটি API, যা মাল্টি-থ্রেডিংকে সহজ এবং কার্যকর করে।

উদাহরণ:

import java.util.concurrent.CompletableFuture;

public class CompletableFutureExample {
    public static void main(String[] args) {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println("Running in: " + Thread.currentThread().getName());
        });

        // Wait for the future to complete
        future.join();
    }
}

কমন মেথড:

  1. runAsync(): কোনো রিটার্ন ভ্যালু ছাড়াই টাস্ক চালু করে।
  2. supplyAsync(): একটি রিটার্ন ভ্যালু সহ টাস্ক চালু করে।
  3. thenApply(): আগের টাস্কের ফলাফল প্রসেস করে।

২. ForkJoinPool

Java 7 এ প্রবর্তিত হলেও, Java 8 এ ForkJoinPool parallel streams এবং CompletableFuture এর মাধ্যমে অধিক কার্যকর হয়েছে। এটি RecursiveTask এবং RecursiveAction ব্যবহার করে কাজ ভাগ করে।

উদাহরণ:

import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;

public class ForkJoinExample {
    static class SumTask extends RecursiveTask<Integer> {
        private final int[] numbers;
        private final int start, end;
        private static final int THRESHOLD = 5;

        public SumTask(int[] numbers, int start, int end) {
            this.numbers = numbers;
            this.start = start;
            this.end = end;
        }

        @Override
        protected Integer compute() {
            if ((end - start) <= THRESHOLD) {
                int sum = 0;
                for (int i = start; i < end; i++) {
                    sum += numbers[i];
                }
                return sum;
            } else {
                int mid = (start + end) / 2;
                SumTask leftTask = new SumTask(numbers, start, mid);
                SumTask rightTask = new SumTask(numbers, mid, end);

                leftTask.fork();
                int rightResult = rightTask.compute();
                int leftResult = leftTask.join();

                return leftResult + rightResult;
            }
        }
    }

    public static void main(String[] args) {
        int[] numbers = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
        ForkJoinPool pool = new ForkJoinPool();

        SumTask task = new SumTask(numbers, 0, numbers.length);
        int result = pool.invoke(task);

        System.out.println("Sum: " + result);
    }
}

Parallel Streams vs CompletableFuture

বৈশিষ্ট্যParallel StreamsCompletableFuture
লক্ষ্যডেটা প্রসেসিং (streams API)অ্যাসিনক্রোনাস টাস্ক ম্যানেজমেন্ট
বিল্ট-ইন Thread PoolCommon ForkJoinPoolCommon ForkJoinPool বা কাস্টম থ্রেড পুল
ব্যবহার ক্ষেত্রস্ট্রিম ডেটা প্যারালাল প্রসেসিংঅ্যাসিনক্রোনাস টাস্ক চেইনিং
কাস্টমাইজেশনসীমিতআরো নমনীয়

Java 8 Concurrency-এর সুবিধা

  1. সারল্য: মাল্টি-থ্রেডিং সহজ এবং সংক্ষিপ্ত কোড।
  2. উন্নত পারফরম্যান্স: মাল্টি-কোর প্রসেসরের শক্তি ব্যবহার করে।
  3. ফিচার সমৃদ্ধ API: Parallel Streams, CompletableFuture, এবং Lambda Integration।

  • Parallel Streams: বড় ডেটাসেট দ্রুত প্রসেস করতে কার্যকর।
  • CompletableFuture: অ্যাসিনক্রোনাস টাস্ক ম্যানেজমেন্টে নমনীয়তা এবং শক্তি যোগ করে।
  • Java 8 এর কনকারেন্সি ফিচারগুলো উন্নত কর্মক্ষমতা, কার্যক্ষমতা, এবং ডেভেলপমেন্ট অভিজ্ঞতা প্রদান করে।
Content added By

Java 8 এর Parallel Stream API এর ভূমিকা

74
74

Java 8 Parallel Stream API হলো একটি শক্তিশালী টুল যা parallelism ব্যবহার করে ডেটা প্রসেসিং সহজ এবং কার্যকরী করে। এটি মূলত বড় ডেটাসেট প্রক্রিয়াকরণে পারফরম্যান্স বৃদ্ধি করতে সাহায্য করে। Parallel Stream ব্যবহার করলে ফর্ক-জয়েন ফ্রেমওয়ার্ক (Fork-Join Framework) এর মাধ্যমে স্বয়ংক্রিয়ভাবে একাধিক থ্রেড ব্যবহার করা হয়।


Parallel Stream API এর ভূমিকা

  1. ডেটা প্রক্রিয়াকরণে গতি বৃদ্ধি:
    • Parallel Stream একই ডেটাসেটের উপাদানগুলোকে সমান্তরালভাবে (parallelly) প্রক্রিয়াকরণ করে গতি বৃদ্ধি করে।
  2. মাল্টি-কোর প্রসেসরের ব্যবহার:
    • Parallel Stream মাল্টি-কোর প্রসেসরের সম্পূর্ণ ক্ষমতা ব্যবহার করে।
  3. কোড সরলতা:
    • মাল্টি-থ্রেডিং বা থ্রেড ম্যানেজমেন্টের প্রয়োজন ছাড়াই সমান্তরাল প্রসেসিং করা যায়।
  4. ফর্ক-জয়েন ফ্রেমওয়ার্ক ব্যবহার:
    • এটি কাজগুলোকে ছোট ছোট সাবটাস্কে ভাগ করে সমান্তরালভাবে প্রক্রিয়াকরণ করে এবং তারপর রেজাল্টগুলো একত্রিত করে।

Parallel Stream API এর ব্যবহার

১. সাধারণ Stream vs Parallel Stream

Parallel Stream এর মাধ্যমে একাধিক থ্রেড ব্যবহার করা হয়, যেখানে সাধারণ Stream একটি থ্রেডে কাজ করে।

import java.util.stream.IntStream;

public class ParallelStreamExample {
    public static void main(String[] args) {
        System.out.println("Normal Stream:");
        IntStream.range(1, 10).forEach(System.out::println);

        System.out.println("\nParallel Stream:");
        IntStream.range(1, 10).parallel().forEach(System.out::println);
    }
}

আউটপুট:

  • Normal Stream: 1 থেকে 9 পর্যন্ত সিরিয়াল আউটপুট।
  • Parallel Stream: 1 থেকে 9 পর্যন্ত অনিয়মিত (non-deterministic) ক্রমে আউটপুট, কারণ এটি একাধিক থ্রেড ব্যবহার করে।

২. লিস্ট প্রসেসিং

import java.util.Arrays;
import java.util.List;

public class ListParallelStreamExample {
    public static void main(String[] args) {
        List<String> names = Arrays.asList("John", "Jane", "Tom", "Emily");

        // Parallel Stream ব্যবহার
        names.parallelStream()
             .forEach(name -> System.out.println(Thread.currentThread().getName() + " processing " + name));
    }
}

বৈশিষ্ট্য:

  • প্রতিটি নাম একটি ভিন্ন থ্রেডে প্রক্রিয়াকরণ হতে পারে।

৩. ফিল্টার এবং ম্যাপ অপারেশন

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class ParallelStreamFilterExample {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);

        List<Integer> evenNumbers = numbers.parallelStream()
                                           .filter(n -> n % 2 == 0)
                                           .map(n -> n * 2)
                                           .collect(Collectors.toList());

        System.out.println("Processed Even Numbers: " + evenNumbers);
    }
}

Parallel Stream ব্যবহার করার সুবিধা

  1. পারফরম্যান্স বৃদ্ধি:
    • বড় ডেটাসেটের ক্ষেত্রে প্রসেসিং টাইম উল্লেখযোগ্যভাবে কমে যায়।
  2. সহজ ইমপ্লিমেন্টেশন:
    • মাল্টি-থ্রেডিং কোড না লিখেই সমান্তরাল প্রসেসিং করা যায়।
  3. অটোমেটিক থ্রেড ম্যানেজমেন্ট:
    • ফর্ক-জয়েন পুল স্বয়ংক্রিয়ভাবে থ্রেড ম্যানেজ করে।
  4. মাল্টি-কোর প্রসেসর ব্যবহার:
    • মাল্টি-কোর প্রসেসর থেকে সর্বাধিক পারফরম্যান্স পাওয়া যায়।

Parallel Stream ব্যবহার করার সীমাবদ্ধতা

  1. সিঙ্ক্রোনাইজেশন ঝুঁকি:
    • শেয়ারড রিসোর্স বা মিউটেবল ডেটার ক্ষেত্রে ডেটা রেস হতে পারে।
  2. ছোট ডেটাসেটে পারফরম্যান্স কমে যায়:
    • Parallel Stream ছোট ডেটাসেটের জন্য অপ্রয়োজনীয় থ্রেড ম্যানেজমেন্টের কারণে ধীর হতে পারে।
  3. অপটিমাইজেশনের অভাব:
    • Parallel Stream সব ধরনের কাজের জন্য পারফরম্যান্স বাড়াবে না। কিছু নির্দিষ্ট কাজ সিরিয়াল স্ট্রিমে দ্রুত হতে পারে।
  4. ডিবাগিং কঠিন:
    • একাধিক থ্রেডের কারণে Parallel Stream এর ডিবাগিং জটিল হতে পারে।

Parallel Stream এর Best Practices

  1. ছোট ডেটাসেট এড়িয়ে চলুন:
    • Parallel Stream বড় ডেটাসেটের জন্য উপযোগী।
  2. I/O-বাউন্ড কাজের জন্য ব্যবহার করবেন না:
    • CPU-বাউন্ড কাজের জন্য এটি বেশি কার্যকর। I/O-নির্ভর কাজ সিরিয়াল স্ট্রিমে ভালো হয়।
  3. স্ট্রিম অর্ডারিং প্রয়োজন হলে সতর্ক থাকুন:

    • Parallel Stream এর আউটপুট ক্রম অগত্যা অর্ডার মেনে চলবে না। forEachOrdered() ব্যবহার করতে পারেন যদি অর্ডার প্রয়োজন হয়।
    IntStream.range(1, 10).parallel().forEachOrdered(System.out::println);
    
  4. মিউটেবল অবজেক্ট ব্যবহার করবেন না:
    • Immutable বা Thread-Safe ডেটা স্ট্রাকচার ব্যবহার করুন।
  5. Custom Thread Pool প্রয়োজন হলে ব্যবহার করুন:

    • ForkJoinPool কাস্টমাইজ করে কন্ট্রোল বাড়ানো যায়।
    ForkJoinPool customPool = new ForkJoinPool(4);
    customPool.submit(() -> {
        IntStream.range(1, 10).parallel().forEach(System.out::println);
    }).join();
    

Java 8 এর Parallel Stream API ডেটা প্রক্রিয়াকরণের সময় সমান্তরাল প্রসেসিং সহজ এবং কার্যকরী করে। এটি বড় ডেটাসেট এবং CPU-বাউন্ড টাস্কগুলোর জন্য পারফেক্ট, তবে এর সীমাবদ্ধতাগুলো বুঝে সঠিকভাবে ব্যবহার করা উচিত। Parallel Stream ব্যবহার করলে প্রোগ্রামের কার্যকারিতা বাড়ানো সম্ভব, তবে ভুল ব্যবহারে পারফরম্যান্স কমে যেতে পারে।

Content added By

Stream API এবং ForkJoinPool এর Integration

96
96

Stream API এবং ForkJoinPool একত্রে ব্যবহার করলে জাভার মাল্টিপ্রসেসিং এবং প্যারালাল প্রোগ্রামিং আরও কার্যকর এবং সহজ হয়ে ওঠে।


Stream API এর ভূমিকা

Stream API জাভা ৮-এ যোগ করা হয়, যা ডেটা প্রক্রিয়াজাত করার জন্য একটি ডিক্লারেটিভ এবং কার্যকরী পদ্ধতি সরবরাহ করে। এটি লেজি-ইভালুয়েশন মডেল এবং প্যারালাল প্রসেসিং সমর্থন করে।


ForkJoinPool এর ভূমিকা

ForkJoinPool একটি বিশেষ ধরনের ExecutorService, যা টাস্কগুলোকে ছোট ছোট সাবটাস্কে বিভক্ত করে প্যারালাল প্রক্রিয়াজাত করতে ব্যবহার করা হয়। Stream API এর parallel() মেথডের মাধ্যমে এটি ব্যাকগ্রাউন্ডে কাজ করে।


Stream API এবং ForkJoinPool এর Integration

১. Sequential Stream vs Parallel Stream

Stream API স্বাভাবিকভাবে sequential হয়। যখন parallel() ব্যবহার করা হয়, এটি ForkJoinPool ব্যবহার করে প্যারালাল প্রসেসিং করে।

উদাহরণ:

import java.util.stream.IntStream;

public class StreamWithForkJoin {
    public static void main(String[] args) {
        // Sequential Stream
        System.out.println("Sequential Stream:");
        IntStream.range(1, 10)
                .forEach(i -> System.out.println(Thread.currentThread().getName() + " - " + i));

        // Parallel Stream
        System.out.println("\nParallel Stream:");
        IntStream.range(1, 10).parallel()
                .forEach(i -> System.out.println(Thread.currentThread().getName() + " - " + i));
    }
}

ফলাফল:

  • Sequential Stream: সব কাজ এক থ্রেডে সম্পন্ন হয়।
  • Parallel Stream: কাজগুলো বিভিন্ন থ্রেডে বিভক্ত হয়।

২. ForkJoinPool ব্যবহার কাস্টমাইজ করা

Stream API ডিফল্টভাবে ForkJoinPool.commonPool() ব্যবহার করে। কিন্তু, কাস্টম ForkJoinPool ব্যবহার করাও সম্ভব।

কোড উদাহরণ:

import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;

public class CustomForkJoinPool {
    public static void main(String[] args) {
        ForkJoinPool customPool = new ForkJoinPool(4); // 4 থ্রেডের কাস্টম পুল

        try {
            customPool.submit(() -> {
                IntStream.range(1, 10).parallel()
                        .forEach(i -> System.out.println(Thread.currentThread().getName() + " - " + i));
            }).get();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            customPool.shutdown();
        }
    }
}

ব্যাখ্যা:

  • ForkJoinPool.commonPool() ব্যবহার না করে একটি কাস্টম পুল ডিফাইন করা হয়েছে।
  • submit() মেথডের মাধ্যমে টাস্ক সাবমিট করা হয়।

৩. প্যারালাল প্রসেসিং এর জন্য Stream API এবং ForkJoinPool ব্যবহার

import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class StreamForkJoinExample {
    public static void main(String[] args) {
        List<Integer> numbers = IntStream.range(1, 20).boxed().collect(Collectors.toList());

        // Default ForkJoinPool (commonPool)
        System.out.println("Default Parallel Stream:");
        numbers.parallelStream()
                .map(i -> i * i)
                .forEach(i -> System.out.println(Thread.currentThread().getName() + " - " + i));

        // Custom ForkJoinPool
        ForkJoinPool customPool = new ForkJoinPool(3); // 3 থ্রেডের পুল
        System.out.println("\nCustom ForkJoinPool:");
        try {
            customPool.submit(() -> {
                numbers.parallelStream()
                        .map(i -> i * i)
                        .forEach(i -> System.out.println(Thread.currentThread().getName() + " - " + i));
            }).get();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            customPool.shutdown();
        }
    }
}

৪. Parallel Reduce অপারেশন

Stream API এবং ForkJoinPool ব্যবহার করে প্যারালাল প্রক্রিয়াজাতকরণে reduce() অপারেশন।

import java.util.stream.IntStream;

public class ParallelReduceExample {
    public static void main(String[] args) {
        int sum = IntStream.range(1, 100)
                .parallel()
                .reduce(0, Integer::sum);

        System.out.println("Sum of numbers (Parallel Reduce): " + sum);
    }
}

ForkJoinPool এর Behavior

  1. Recursive Task Division: ForkJoinPool কাজগুলোকে ছোট ছোট টাস্কে ভাগ করে (fork) এবং সেগুলো আলাদাভাবে সম্পন্ন করে (join)।
  2. Work Stealing: একটি থ্রেড যদি তার টাস্ক শেষ করে ফেলে, এটি অন্য থ্রেডের অসম্পূর্ণ টাস্ক নিয়ে কাজ করতে পারে।
  3. Common Pool: Stream.parallel() ডিফল্ট ForkJoinPool ব্যবহার করে।

Stream API এবং ForkJoinPool Integration এর সুবিধা

  1. Simple Parallelism: সহজেই প্যারালাল প্রসেসিং ইমপ্লিমেন্ট করা যায়।
  2. Efficiency: ForkJoinPool এর মাধ্যমে কার্যকর প্যারালাল প্রসেসিং।
  3. Scalability: বড় ডেটাসেটের জন্য কার্যকর।
  4. Work Stealing Algorithm: থ্রেডগুলোর মধ্যে লোড ব্যালেন্সিং।

ব্যবহারিক উদাহরণ: বড় ডেটাসেট প্রসেসিং

import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class LargeDatasetProcessing {
    public static void main(String[] args) {
        List<Integer> largeDataset = IntStream.range(1, 1_000_000).boxed().collect(Collectors.toList());

        ForkJoinPool customPool = new ForkJoinPool(8); // 8 থ্রেডের পুল
        try {
            customPool.submit(() -> {
                long count = largeDataset.parallelStream()
                        .filter(i -> i % 2 == 0)
                        .count();

                System.out.println("Count of even numbers: " + count);
            }).get();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            customPool.shutdown();
        }
    }
}

Stream API এবং ForkJoinPool Integration এর সীমাবদ্ধতা

  1. Overhead: ছোট ডেটাসেটের জন্য প্যারালাল প্রসেসিং অপ্রয়োজনীয় ও ধীর হতে পারে।
  2. Thread Contention: অত্যধিক থ্রেড ব্যবহার করলে ডেটা রেস এবং কনটেনশন সমস্যা হতে পারে।
  3. Complex Debugging: প্যারালাল স্ট্রিমে ডিবাগিং করা কঠিন।

Stream API এবং ForkJoinPool একত্রে প্যারালাল প্রসেসিং সহজ এবং কার্যকর করে। বিশেষত, বড় ডেটাসেট বা কম্পিউটেশনাল কাজের জন্য এটি উপযুক্ত। তবে ডেটাসেটের আকার এবং থ্রেড ব্যবহারের সঠিক ভারসাম্য নিশ্চিত করা গুরুত্বপূর্ণ।

Content added By

Parallel Streams এর মাধ্যমে Performance Optimization

67
67

Parallel Streams জাভার Streams API এর একটি অংশ, যা Fork/Join Framework ব্যবহার করে ডেটা প্রসেসিংকে মাল্টিপ্রসেসর কোরে ভাগ করে। এটি বড় ডেটা সেটের জন্য পারফরম্যান্স উন্নত করে এবং প্রোগ্রামিংকে আরও কার্যকর ও সহজ করে তোলে।


Parallel Streams এর বৈশিষ্ট্য

  1. অ্যাসিঙ্ক্রোনাস প্রসেসিং: একাধিক থ্রেড ব্যবহার করে কাজ ভাগ করে।
  2. ডেটা প্যারালেলিজম: ডেটা অংশগুলো আলাদাভাবে প্রসেস হয়।
  3. ডিফল্ট থ্রেড পুল: JVM-এর common ForkJoinPool ব্যবহার করে, যা থ্রেড সংখ্যা কন্ট্রোল করে।
  4. সহজ সিনট্যাক্স: সাধারণ streams থেকে সহজেই প্যারালেল স্ট্রিমে রূপান্তর করা যায়।

Parallel Streams এর ব্যবহার

১. সাধারণ Parallel Streams উদাহরণ

import java.util.List;
import java.util.stream.IntStream;

public class ParallelStreamExample {
    public static void main(String[] args) {
        // বড় ডেটা সেট তৈরি
        List<Integer> numbers = IntStream.range(1, 10001).boxed().toList();

        // Sequential Stream
        long startTime = System.currentTimeMillis();
        numbers.stream().forEach(ParallelStreamExample::process);
        long endTime = System.currentTimeMillis();
        System.out.println("Sequential Stream Time: " + (endTime - startTime) + " ms");

        // Parallel Stream
        startTime = System.currentTimeMillis();
        numbers.parallelStream().forEach(ParallelStreamExample::process);
        endTime = System.currentTimeMillis();
        System.out.println("Parallel Stream Time: " + (endTime - startTime) + " ms");
    }

    private static void process(Integer number) {
        try {
            Thread.sleep(1); // প্রতিটি আইটেম প্রক্রিয়া করতে সময় নেয়
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

আউটপুট:

Sequential Stream Time: 10000 ms
Parallel Stream Time: ~2000 ms

২. Parallel Streams এর মাধ্যমে ডেটা ফিল্টার ও ম্যাপ করা

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class ParallelStreamFilterExample {
    public static void main(String[] args) {
        List<Integer> numbers = IntStream.range(1, 10001).boxed().toList();

        // Parallel Stream ব্যবহার করে ফিল্টার ও ম্যাপ করা
        List<Integer> evenNumbers = numbers.parallelStream()
                .filter(n -> n % 2 == 0) // শুধুমাত্র জোড় সংখ্যা ফিল্টার
                .map(n -> n * n)         // প্রতিটি সংখ্যা স্কয়ার
                .collect(Collectors.toList());

        System.out.println("Even Numbers Count: " + evenNumbers.size());
    }
}

Parallel Streams এর সুবিধা

  1. পারফরম্যান্স বৃদ্ধি: বড় ডেটা সেট প্রক্রিয়াকরণের সময় কম লাগে।
  2. সহজ ব্যবহার: Streams API এর উপর ভিত্তি করে সহজেই ব্যবহার করা যায়।
  3. কোর ইউটিলাইজেশন: মাল্টি-কোর প্রসেসর ব্যবহার করে কাজ ভাগ করা হয়।

Parallel Streams এর সীমাবদ্ধতা

  1. থ্রেড-সেফটি: যদি ডেটা স্ট্রাকচার থ্রেড-সেফ না হয়, তাহলে ConcurrentModificationException হতে পারে।
  2. কিছু ছোট ডেটাসেটে ওভারহেড: Parallel Streams ছোট ডেটাসেটের জন্য ওভারহেড তৈরি করতে পারে।
  3. উন্নত নিয়ন্ত্রণের অভাব: থ্রেড সংখ্যা সরাসরি নিয়ন্ত্রণ করা সম্ভব নয়; JVM এটি পরিচালনা করে।

Parallel Streams ব্যবহার করার সময় সতর্কতা

১. সাইড এফেক্ট এড়ানো

যদি স্ট্রিমে কোনো সাইড এফেক্ট থাকে (যেমন ডেটা মডিফিকেশন), তাহলে Parallel Streams ব্যবহার না করাই ভালো।

// ভুল ব্যবহার: সাইড এফেক্ট সৃষ্টি করে
List<Integer> numbers = IntStream.range(1, 10001).boxed().toList();
List<Integer> results = new ArrayList<>();

numbers.parallelStream().forEach(n -> results.add(n * n)); // ConcurrentModificationException হতে পারে

২. সংক্ষিপ্ত প্রসেসিং

Parallel Streams শুধুমাত্র বড় ডেটাসেটের জন্য উপযোগী। ছোট ডেটাসেটের জন্য sequential() ব্যবহার করুন।


Parallel Streams বনাম Sequential Streams

প্যারামিটারSequential StreamsParallel Streams
প্রসেসিং মডেলএকক থ্রেড ব্যবহার করে।একাধিক থ্রেড ব্যবহার করে।
পারফরম্যান্সছোট ডেটাসেটের জন্য কার্যকর।বড় ডেটাসেটের জন্য কার্যকর।
সিনট্যাক্সstream() ব্যবহার করে।parallelStream() ব্যবহার করে।
থ্রেড কন্ট্রোলকোনো থ্রেড ব্যবস্থাপনা নেই।JVM স্বয়ংক্রিয়ভাবে থ্রেড নিয়ন্ত্রণ করে।

Parallel Streams এর কাস্টম থ্রেড পুল ব্যবহার

import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;

public class CustomParallelStreamExample {
    public static void main(String[] args) {
        List<Integer> numbers = IntStream.range(1, 10001).boxed().toList();

        ForkJoinPool customThreadPool = new ForkJoinPool(4); // কাস্টম থ্রেড পুল তৈরি
        try {
            customThreadPool.submit(() -> {
                numbers.parallelStream().forEach(CustomParallelStreamExample::process);
            }).get();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            customThreadPool.shutdown();
        }
    }

    private static void process(Integer number) {
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  1. Parallel Streams বড় ডেটাসেট প্রক্রিয়াকরণের জন্য কার্যকর এবং মাল্টি-কোর প্রসেসর ব্যবহার করে পারফরম্যান্স উন্নত করে।
  2. ব্যবহারের সতর্কতা: সাইড এফেক্ট এবং ছোট ডেটাসেটের জন্য এটি সাবধানে ব্যবহার করা উচিত।
  3. কাস্টম থ্রেড পুল: আরও নিয়ন্ত্রণের জন্য কাস্টম থ্রেড পুল ব্যবহার করা যায়।

এই পদ্ধতি সঠিকভাবে ব্যবহার করলে ডেটা প্রসেসিং অনেক বেশি কার্যকর এবং দ্রুত করা সম্ভব।

Content added By

Sequential এবং Parallel Streams এর মধ্যে পার্থক্য

59
59

জাভার Stream API ডেটা প্রসেসিংয়ের জন্য একটি শক্তিশালী টুল যা Sequential এবং Parallel Streams এ কাজ করতে পারে। উভয়ই ডেটা প্রসেসিংয়ে কার্যকর হলেও, তাদের ব্যবহার এবং পারফরম্যান্সে বড় পার্থক্য রয়েছে।


Sequential Stream

Sequential Stream ডেটা প্রসেসিং ধাপে ধাপে সম্পন্ন করে, যেখানে প্রতিটি ধাপ পরবর্তী ধাপ শুরু হওয়ার আগে শেষ হয়। এটি সিঙ্গেল থ্রেড ব্যবহার করে।

বৈশিষ্ট্য:

  1. সিঙ্গল থ্রেড: Sequential Stream সব অপারেশন একক থ্রেডে সম্পন্ন করে।
  2. অর্ডার মেইন্টেন: ডেটা প্রসেসিং সর্বদা উৎসের ক্রম অনুযায়ী হয়।
  3. কমপ্লেক্স নয়: সিম্পল এবং ডিবাগিং সহজ।

উদাহরণ:

import java.util.stream.IntStream;

public class SequentialStreamExample {
    public static void main(String[] args) {
        System.out.println("Sequential Stream:");
        IntStream.range(1, 10)
                 .forEach(i -> System.out.println(Thread.currentThread().getName() + " : " + i));
    }
}

আউটপুট (সম্ভাব্য):

Sequential Stream:
main : 1
main : 2
main : 3
...
main : 9

ব্যাখ্যা: main থ্রেড Sequential Stream এর সব কাজ সম্পন্ন করে।


Parallel Stream

Parallel Stream ডেটা প্রসেসিং মাল্টিপল থ্রেডে বিভক্ত করে। এটি ForkJoinPool ব্যবহার করে Parallel Processing পরিচালনা করে।

বৈশিষ্ট্য:

  1. মাল্টিপল থ্রেড: Parallel Stream কাজকে একাধিক থ্রেডে ভাগ করে।
  2. পারফরম্যান্স উন্নতি: বড় ডেটাসেটের জন্য দ্রুত কাজ সম্পন্ন করে।
  3. অর্ডার অপরিহার্য নয়: কিছু ক্ষেত্রে উৎস ক্রম মেইন্টেন না-ও হতে পারে।

উদাহরণ:

import java.util.stream.IntStream;

public class ParallelStreamExample {
    public static void main(String[] args) {
        System.out.println("Parallel Stream:");
        IntStream.range(1, 10)
                 .parallel()
                 .forEach(i -> System.out.println(Thread.currentThread().getName() + " : " + i));
    }
}

আউটপুট (সম্ভাব্য):

Parallel Stream:
ForkJoinPool.commonPool-worker-3 : 1
main : 2
ForkJoinPool.commonPool-worker-5 : 3
...

ব্যাখ্যা: একাধিক থ্রেড ব্যবহার করে কাজ দ্রুত সম্পন্ন হয়।


Sequential এবং Parallel Streams এর মধ্যে পার্থক্য

বৈশিষ্ট্যSequential StreamParallel Stream
থ্রেড ব্যবহারে কৌশলসিঙ্গল থ্রেডমাল্টিপল থ্রেড
অর্ডার মেইন্টেনউৎস ক্রম অনুযায়ী ডেটা প্রসেস করেসবসময় উৎস ক্রম মেইন্টেন করে না
পারফরম্যান্সছোট ডেটাসেটের জন্য কার্যকরবড় ডেটাসেটের জন্য কার্যকর
কোড সিম্প্লিসিটিসহজ এবং ডিবাগিং সহজডিবাগিং কিছুটা কঠিন
উদ্দেশ্যছোট ডেটাসেট এবং সিঙ্গেল-থ্রেড প্রসেসিংবড় ডেটাসেট এবং মাল্টি-থ্রেড প্রসেসিং

Sequential এবং Parallel Streams এর পারফরম্যান্স তুলনা

কোড: Sequential বনাম Parallel

import java.util.stream.LongStream;

public class StreamPerformanceComparison {
    public static void main(String[] args) {
        long n = 10_000_000;

        // Sequential Stream
        long startTime = System.currentTimeMillis();
        long sequentialSum = LongStream.rangeClosed(1, n).sum();
        long endTime = System.currentTimeMillis();
        System.out.println("Sequential Stream Time: " + (endTime - startTime) + " ms");

        // Parallel Stream
        startTime = System.currentTimeMillis();
        long parallelSum = LongStream.rangeClosed(1, n).parallel().sum();
        endTime = System.currentTimeMillis();
        System.out.println("Parallel Stream Time: " + (endTime - startTime) + " ms");
    }
}

সম্ভাব্য আউটপুট:

Sequential Stream Time: 120 ms
Parallel Stream Time: 45 ms

বিশ্লেষণ:

  • Parallel Stream বড় ডেটাসেটের জন্য দ্রুত কাজ সম্পন্ন করে।
  • ছোট ডেটাসেটের জন্য Sequential Stream ভালো।

Best Practices

  1. ছোট ডেটাসেটের জন্য Sequential Stream ব্যবহার করুন।
    • Parallel Stream ছোট ডেটাসেটে ওভারহেড তৈরি করে।
  2. Parallel Stream ব্যবহার করার আগে প্রসেসিং টাইম পরিমাপ করুন।
    • Parallel Stream বড় ডেটাসেটের জন্যই কার্যকর।
  3. সঠিক অর্ডার প্রয়োজন হলে Parallel Stream এ forEachOrdered() ব্যবহার করুন।

    IntStream.range(1, 10)
             .parallel()
             .forEachOrdered(System.out::println);
    
  4. স্টেটলেস অপারেশন নিশ্চিত করুন।
    • Parallel Stream এ স্টেটফুল অপারেশন (যেমন গ্লোবাল ভেরিয়েবল পরিবর্তন) সমস্যা তৈরি করতে পারে।

Common Pitfalls

  1. Incorrect Use of Parallel Stream
    • ছোট ডেটাসেট বা কমপ্লেক্স ডেটা প্রসেসিংয়ে Parallel Stream ওভারহেড তৈরি করে এবং পারফরম্যান্স কমায়।
  2. Unordered Processing
    • Parallel Stream উৎস ক্রম বজায় না রাখলে ভুল ফলাফল হতে পারে।
  3. Stateful Lambda Expression

    • Parallel Stream এ স্টেটফুল ল্যাম্বডা এক্সপ্রেশন ডেটা ইনকনসিস্টেন্সি সৃষ্টি করতে পারে।
    List<Integer> list = new ArrayList<>();
    IntStream.range(1, 100).parallel().forEach(list::add); // Unsafe!
    
  4. ForkJoinPool Overhead
    • Parallel Stream অতিরিক্ত থ্রেড ব্যবহারে ForkJoinPool ওভারলোড হতে পারে।

স্ট্রিম টাইপসেরা ব্যবহারের পরিস্থিতি
Sequentialছোট ডেটাসেট, ডিবাগিং সহজ, এবং উৎস ক্রম প্রয়োজন।
Parallelবড় ডেটাসেট এবং মাল্টি-থ্রেড প্রসেসিং যেখানে পারফরম্যান্স গুরুত্বপূর্ণ।

Sequential এবং Parallel Streams সঠিকভাবে ব্যবহার করলে ডেটা প্রসেসিং আরও কার্যকর এবং কনকারেন্ট অ্যাপ্লিকেশন তৈরি সহজ হয়।

Content added By
টপ রেটেড অ্যাপ

স্যাট অ্যাকাডেমী অ্যাপ

আমাদের অল-ইন-ওয়ান মোবাইল অ্যাপের মাধ্যমে সীমাহীন শেখার সুযোগ উপভোগ করুন।

ভিডিও
লাইভ ক্লাস
এক্সাম
ডাউনলোড করুন
Promotion