Stream API এবং ForkJoinPool এর Integration

Parallel Streams এবং Java 8 Concurrency - জাভা কনকারেন্সি (Java Concurrency) - Java Technologies

329

Stream API এবং ForkJoinPool একত্রে ব্যবহার করলে জাভার মাল্টিপ্রসেসিং এবং প্যারালাল প্রোগ্রামিং আরও কার্যকর এবং সহজ হয়ে ওঠে।


Stream API এর ভূমিকা

Stream API জাভা ৮-এ যোগ করা হয়, যা ডেটা প্রক্রিয়াজাত করার জন্য একটি ডিক্লারেটিভ এবং কার্যকরী পদ্ধতি সরবরাহ করে। এটি লেজি-ইভালুয়েশন মডেল এবং প্যারালাল প্রসেসিং সমর্থন করে।


ForkJoinPool এর ভূমিকা

ForkJoinPool একটি বিশেষ ধরনের ExecutorService, যা টাস্কগুলোকে ছোট ছোট সাবটাস্কে বিভক্ত করে প্যারালাল প্রক্রিয়াজাত করতে ব্যবহার করা হয়। Stream API এর parallel() মেথডের মাধ্যমে এটি ব্যাকগ্রাউন্ডে কাজ করে।


Stream API এবং ForkJoinPool এর Integration

১. Sequential Stream vs Parallel Stream

Stream API স্বাভাবিকভাবে sequential হয়। যখন parallel() ব্যবহার করা হয়, এটি ForkJoinPool ব্যবহার করে প্যারালাল প্রসেসিং করে।

উদাহরণ:

import java.util.stream.IntStream;

public class StreamWithForkJoin {
    public static void main(String[] args) {
        // Sequential Stream
        System.out.println("Sequential Stream:");
        IntStream.range(1, 10)
                .forEach(i -> System.out.println(Thread.currentThread().getName() + " - " + i));

        // Parallel Stream
        System.out.println("\nParallel Stream:");
        IntStream.range(1, 10).parallel()
                .forEach(i -> System.out.println(Thread.currentThread().getName() + " - " + i));
    }
}

ফলাফল:

  • Sequential Stream: সব কাজ এক থ্রেডে সম্পন্ন হয়।
  • Parallel Stream: কাজগুলো বিভিন্ন থ্রেডে বিভক্ত হয়।

২. ForkJoinPool ব্যবহার কাস্টমাইজ করা

Stream API ডিফল্টভাবে ForkJoinPool.commonPool() ব্যবহার করে। কিন্তু, কাস্টম ForkJoinPool ব্যবহার করাও সম্ভব।

কোড উদাহরণ:

import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;

public class CustomForkJoinPool {
    public static void main(String[] args) {
        ForkJoinPool customPool = new ForkJoinPool(4); // 4 থ্রেডের কাস্টম পুল

        try {
            customPool.submit(() -> {
                IntStream.range(1, 10).parallel()
                        .forEach(i -> System.out.println(Thread.currentThread().getName() + " - " + i));
            }).get();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            customPool.shutdown();
        }
    }
}

ব্যাখ্যা:

  • ForkJoinPool.commonPool() ব্যবহার না করে একটি কাস্টম পুল ডিফাইন করা হয়েছে।
  • submit() মেথডের মাধ্যমে টাস্ক সাবমিট করা হয়।

৩. প্যারালাল প্রসেসিং এর জন্য Stream API এবং ForkJoinPool ব্যবহার

import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class StreamForkJoinExample {
    public static void main(String[] args) {
        List<Integer> numbers = IntStream.range(1, 20).boxed().collect(Collectors.toList());

        // Default ForkJoinPool (commonPool)
        System.out.println("Default Parallel Stream:");
        numbers.parallelStream()
                .map(i -> i * i)
                .forEach(i -> System.out.println(Thread.currentThread().getName() + " - " + i));

        // Custom ForkJoinPool
        ForkJoinPool customPool = new ForkJoinPool(3); // 3 থ্রেডের পুল
        System.out.println("\nCustom ForkJoinPool:");
        try {
            customPool.submit(() -> {
                numbers.parallelStream()
                        .map(i -> i * i)
                        .forEach(i -> System.out.println(Thread.currentThread().getName() + " - " + i));
            }).get();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            customPool.shutdown();
        }
    }
}

৪. Parallel Reduce অপারেশন

Stream API এবং ForkJoinPool ব্যবহার করে প্যারালাল প্রক্রিয়াজাতকরণে reduce() অপারেশন।

import java.util.stream.IntStream;

public class ParallelReduceExample {
    public static void main(String[] args) {
        int sum = IntStream.range(1, 100)
                .parallel()
                .reduce(0, Integer::sum);

        System.out.println("Sum of numbers (Parallel Reduce): " + sum);
    }
}

ForkJoinPool এর Behavior

  1. Recursive Task Division: ForkJoinPool কাজগুলোকে ছোট ছোট টাস্কে ভাগ করে (fork) এবং সেগুলো আলাদাভাবে সম্পন্ন করে (join)।
  2. Work Stealing: একটি থ্রেড যদি তার টাস্ক শেষ করে ফেলে, এটি অন্য থ্রেডের অসম্পূর্ণ টাস্ক নিয়ে কাজ করতে পারে।
  3. Common Pool: Stream.parallel() ডিফল্ট ForkJoinPool ব্যবহার করে।

Stream API এবং ForkJoinPool Integration এর সুবিধা

  1. Simple Parallelism: সহজেই প্যারালাল প্রসেসিং ইমপ্লিমেন্ট করা যায়।
  2. Efficiency: ForkJoinPool এর মাধ্যমে কার্যকর প্যারালাল প্রসেসিং।
  3. Scalability: বড় ডেটাসেটের জন্য কার্যকর।
  4. Work Stealing Algorithm: থ্রেডগুলোর মধ্যে লোড ব্যালেন্সিং।

ব্যবহারিক উদাহরণ: বড় ডেটাসেট প্রসেসিং

import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class LargeDatasetProcessing {
    public static void main(String[] args) {
        List<Integer> largeDataset = IntStream.range(1, 1_000_000).boxed().collect(Collectors.toList());

        ForkJoinPool customPool = new ForkJoinPool(8); // 8 থ্রেডের পুল
        try {
            customPool.submit(() -> {
                long count = largeDataset.parallelStream()
                        .filter(i -> i % 2 == 0)
                        .count();

                System.out.println("Count of even numbers: " + count);
            }).get();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            customPool.shutdown();
        }
    }
}

Stream API এবং ForkJoinPool Integration এর সীমাবদ্ধতা

  1. Overhead: ছোট ডেটাসেটের জন্য প্যারালাল প্রসেসিং অপ্রয়োজনীয় ও ধীর হতে পারে।
  2. Thread Contention: অত্যধিক থ্রেড ব্যবহার করলে ডেটা রেস এবং কনটেনশন সমস্যা হতে পারে।
  3. Complex Debugging: প্যারালাল স্ট্রিমে ডিবাগিং করা কঠিন।

Stream API এবং ForkJoinPool একত্রে প্যারালাল প্রসেসিং সহজ এবং কার্যকর করে। বিশেষত, বড় ডেটাসেট বা কম্পিউটেশনাল কাজের জন্য এটি উপযুক্ত। তবে ডেটাসেটের আকার এবং থ্রেড ব্যবহারের সঠিক ভারসাম্য নিশ্চিত করা গুরুত্বপূর্ণ।

Content added By
Promotion

Are you sure to start over?

Loading...