Java 8 থেকে Parallel Streams এবং অন্যান্য নতুন কনকারেন্সি ফিচার যোগ করা হয়েছে, যা ডেটা প্রসেসিং এবং মাল্টি-কোর প্রসেসরের সর্বোত্তম ব্যবহার নিশ্চিত করে।
Parallel Streams হলো Java Streams API এর একটি বৈশিষ্ট্য, যা একই স্ট্রিম ডেটা মাল্টিপল থ্রেডে প্রসেস করতে দেয়। এটি ডেটা প্রক্রিয়াকরণে স্বয়ংক্রিয়ভাবে প্যারালালিজম ব্যবহার করে।
parallel()
মেথড দিয়ে প্যারালাল করা যায়।import java.util.Arrays;
import java.util.List;
public class ParallelStreamExample {
public static void main(String[] args) {
List<String> names = Arrays.asList("John", "Jane", "Jack", "Jill");
// Sequential Stream
System.out.println("Sequential Stream:");
names.stream().forEach(name -> {
System.out.println(Thread.currentThread().getName() + " - " + name);
});
// Parallel Stream
System.out.println("\nParallel Stream:");
names.parallelStream().forEach(name -> {
System.out.println(Thread.currentThread().getName() + " - " + name);
});
}
}
আউটপুট:Sequential Stream
সব ডেটা মেইন থ্রেডে প্রসেস করে, আর Parallel Stream
মাল্টিপল থ্রেডে প্রসেস করে।
Java 8 এ কনকারেন্সি ব্যবস্থার উন্নতির জন্য বেশ কিছু নতুন ফিচার এবং API অন্তর্ভুক্ত করা হয়েছে। প্রধানগুলো হলো:
CompletableFuture
হলো Java 8 এ অ্যাসিনক্রোনাস টাস্ক পরিচালনার জন্য একটি API, যা মাল্টি-থ্রেডিংকে সহজ এবং কার্যকর করে।
উদাহরণ:
import java.util.concurrent.CompletableFuture;
public class CompletableFutureExample {
public static void main(String[] args) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("Running in: " + Thread.currentThread().getName());
});
// Wait for the future to complete
future.join();
}
}
runAsync()
: কোনো রিটার্ন ভ্যালু ছাড়াই টাস্ক চালু করে।supplyAsync()
: একটি রিটার্ন ভ্যালু সহ টাস্ক চালু করে।thenApply()
: আগের টাস্কের ফলাফল প্রসেস করে।Java 7 এ প্রবর্তিত হলেও, Java 8 এ ForkJoinPool
parallel streams এবং CompletableFuture এর মাধ্যমে অধিক কার্যকর হয়েছে। এটি RecursiveTask এবং RecursiveAction ব্যবহার করে কাজ ভাগ করে।
উদাহরণ:
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;
public class ForkJoinExample {
static class SumTask extends RecursiveTask<Integer> {
private final int[] numbers;
private final int start, end;
private static final int THRESHOLD = 5;
public SumTask(int[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if ((end - start) <= THRESHOLD) {
int sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
} else {
int mid = (start + end) / 2;
SumTask leftTask = new SumTask(numbers, start, mid);
SumTask rightTask = new SumTask(numbers, mid, end);
leftTask.fork();
int rightResult = rightTask.compute();
int leftResult = leftTask.join();
return leftResult + rightResult;
}
}
}
public static void main(String[] args) {
int[] numbers = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
ForkJoinPool pool = new ForkJoinPool();
SumTask task = new SumTask(numbers, 0, numbers.length);
int result = pool.invoke(task);
System.out.println("Sum: " + result);
}
}
বৈশিষ্ট্য | Parallel Streams | CompletableFuture |
---|---|---|
লক্ষ্য | ডেটা প্রসেসিং (streams API) | অ্যাসিনক্রোনাস টাস্ক ম্যানেজমেন্ট |
বিল্ট-ইন Thread Pool | Common ForkJoinPool | Common ForkJoinPool বা কাস্টম থ্রেড পুল |
ব্যবহার ক্ষেত্র | স্ট্রিম ডেটা প্যারালাল প্রসেসিং | অ্যাসিনক্রোনাস টাস্ক চেইনিং |
কাস্টমাইজেশন | সীমিত | আরো নমনীয় |
Java 8 Parallel Stream API হলো একটি শক্তিশালী টুল যা parallelism ব্যবহার করে ডেটা প্রসেসিং সহজ এবং কার্যকরী করে। এটি মূলত বড় ডেটাসেট প্রক্রিয়াকরণে পারফরম্যান্স বৃদ্ধি করতে সাহায্য করে। Parallel Stream ব্যবহার করলে ফর্ক-জয়েন ফ্রেমওয়ার্ক (Fork-Join Framework) এর মাধ্যমে স্বয়ংক্রিয়ভাবে একাধিক থ্রেড ব্যবহার করা হয়।
Parallel Stream এর মাধ্যমে একাধিক থ্রেড ব্যবহার করা হয়, যেখানে সাধারণ Stream একটি থ্রেডে কাজ করে।
import java.util.stream.IntStream;
public class ParallelStreamExample {
public static void main(String[] args) {
System.out.println("Normal Stream:");
IntStream.range(1, 10).forEach(System.out::println);
System.out.println("\nParallel Stream:");
IntStream.range(1, 10).parallel().forEach(System.out::println);
}
}
আউটপুট:
import java.util.Arrays;
import java.util.List;
public class ListParallelStreamExample {
public static void main(String[] args) {
List<String> names = Arrays.asList("John", "Jane", "Tom", "Emily");
// Parallel Stream ব্যবহার
names.parallelStream()
.forEach(name -> System.out.println(Thread.currentThread().getName() + " processing " + name));
}
}
বৈশিষ্ট্য:
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class ParallelStreamFilterExample {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
List<Integer> evenNumbers = numbers.parallelStream()
.filter(n -> n % 2 == 0)
.map(n -> n * 2)
.collect(Collectors.toList());
System.out.println("Processed Even Numbers: " + evenNumbers);
}
}
স্ট্রিম অর্ডারিং প্রয়োজন হলে সতর্ক থাকুন:
forEachOrdered()
ব্যবহার করতে পারেন যদি অর্ডার প্রয়োজন হয়।IntStream.range(1, 10).parallel().forEachOrdered(System.out::println);
Custom Thread Pool প্রয়োজন হলে ব্যবহার করুন:
ForkJoinPool
কাস্টমাইজ করে কন্ট্রোল বাড়ানো যায়।ForkJoinPool customPool = new ForkJoinPool(4);
customPool.submit(() -> {
IntStream.range(1, 10).parallel().forEach(System.out::println);
}).join();
Java 8 এর Parallel Stream API ডেটা প্রক্রিয়াকরণের সময় সমান্তরাল প্রসেসিং সহজ এবং কার্যকরী করে। এটি বড় ডেটাসেট এবং CPU-বাউন্ড টাস্কগুলোর জন্য পারফেক্ট, তবে এর সীমাবদ্ধতাগুলো বুঝে সঠিকভাবে ব্যবহার করা উচিত। Parallel Stream ব্যবহার করলে প্রোগ্রামের কার্যকারিতা বাড়ানো সম্ভব, তবে ভুল ব্যবহারে পারফরম্যান্স কমে যেতে পারে।
Stream API এবং ForkJoinPool একত্রে ব্যবহার করলে জাভার মাল্টিপ্রসেসিং এবং প্যারালাল প্রোগ্রামিং আরও কার্যকর এবং সহজ হয়ে ওঠে।
Stream API জাভা ৮-এ যোগ করা হয়, যা ডেটা প্রক্রিয়াজাত করার জন্য একটি ডিক্লারেটিভ এবং কার্যকরী পদ্ধতি সরবরাহ করে। এটি লেজি-ইভালুয়েশন মডেল এবং প্যারালাল প্রসেসিং সমর্থন করে।
ForkJoinPool একটি বিশেষ ধরনের ExecutorService, যা টাস্কগুলোকে ছোট ছোট সাবটাস্কে বিভক্ত করে প্যারালাল প্রক্রিয়াজাত করতে ব্যবহার করা হয়। Stream API এর parallel() মেথডের মাধ্যমে এটি ব্যাকগ্রাউন্ডে কাজ করে।
Stream API স্বাভাবিকভাবে sequential হয়। যখন parallel() ব্যবহার করা হয়, এটি ForkJoinPool ব্যবহার করে প্যারালাল প্রসেসিং করে।
উদাহরণ:
import java.util.stream.IntStream;
public class StreamWithForkJoin {
public static void main(String[] args) {
// Sequential Stream
System.out.println("Sequential Stream:");
IntStream.range(1, 10)
.forEach(i -> System.out.println(Thread.currentThread().getName() + " - " + i));
// Parallel Stream
System.out.println("\nParallel Stream:");
IntStream.range(1, 10).parallel()
.forEach(i -> System.out.println(Thread.currentThread().getName() + " - " + i));
}
}
ফলাফল:
Stream API ডিফল্টভাবে ForkJoinPool.commonPool() ব্যবহার করে। কিন্তু, কাস্টম ForkJoinPool ব্যবহার করাও সম্ভব।
কোড উদাহরণ:
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;
public class CustomForkJoinPool {
public static void main(String[] args) {
ForkJoinPool customPool = new ForkJoinPool(4); // 4 থ্রেডের কাস্টম পুল
try {
customPool.submit(() -> {
IntStream.range(1, 10).parallel()
.forEach(i -> System.out.println(Thread.currentThread().getName() + " - " + i));
}).get();
} catch (Exception e) {
e.printStackTrace();
} finally {
customPool.shutdown();
}
}
}
ব্যাখ্যা:
submit()
মেথডের মাধ্যমে টাস্ক সাবমিট করা হয়।import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class StreamForkJoinExample {
public static void main(String[] args) {
List<Integer> numbers = IntStream.range(1, 20).boxed().collect(Collectors.toList());
// Default ForkJoinPool (commonPool)
System.out.println("Default Parallel Stream:");
numbers.parallelStream()
.map(i -> i * i)
.forEach(i -> System.out.println(Thread.currentThread().getName() + " - " + i));
// Custom ForkJoinPool
ForkJoinPool customPool = new ForkJoinPool(3); // 3 থ্রেডের পুল
System.out.println("\nCustom ForkJoinPool:");
try {
customPool.submit(() -> {
numbers.parallelStream()
.map(i -> i * i)
.forEach(i -> System.out.println(Thread.currentThread().getName() + " - " + i));
}).get();
} catch (Exception e) {
e.printStackTrace();
} finally {
customPool.shutdown();
}
}
}
Stream API এবং ForkJoinPool ব্যবহার করে প্যারালাল প্রক্রিয়াজাতকরণে reduce() অপারেশন।
import java.util.stream.IntStream;
public class ParallelReduceExample {
public static void main(String[] args) {
int sum = IntStream.range(1, 100)
.parallel()
.reduce(0, Integer::sum);
System.out.println("Sum of numbers (Parallel Reduce): " + sum);
}
}
Stream.parallel()
ডিফল্ট ForkJoinPool ব্যবহার করে।import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class LargeDatasetProcessing {
public static void main(String[] args) {
List<Integer> largeDataset = IntStream.range(1, 1_000_000).boxed().collect(Collectors.toList());
ForkJoinPool customPool = new ForkJoinPool(8); // 8 থ্রেডের পুল
try {
customPool.submit(() -> {
long count = largeDataset.parallelStream()
.filter(i -> i % 2 == 0)
.count();
System.out.println("Count of even numbers: " + count);
}).get();
} catch (Exception e) {
e.printStackTrace();
} finally {
customPool.shutdown();
}
}
}
Stream API এবং ForkJoinPool একত্রে প্যারালাল প্রসেসিং সহজ এবং কার্যকর করে। বিশেষত, বড় ডেটাসেট বা কম্পিউটেশনাল কাজের জন্য এটি উপযুক্ত। তবে ডেটাসেটের আকার এবং থ্রেড ব্যবহারের সঠিক ভারসাম্য নিশ্চিত করা গুরুত্বপূর্ণ।
Parallel Streams জাভার Streams API এর একটি অংশ, যা Fork/Join Framework ব্যবহার করে ডেটা প্রসেসিংকে মাল্টিপ্রসেসর কোরে ভাগ করে। এটি বড় ডেটা সেটের জন্য পারফরম্যান্স উন্নত করে এবং প্রোগ্রামিংকে আরও কার্যকর ও সহজ করে তোলে।
import java.util.List;
import java.util.stream.IntStream;
public class ParallelStreamExample {
public static void main(String[] args) {
// বড় ডেটা সেট তৈরি
List<Integer> numbers = IntStream.range(1, 10001).boxed().toList();
// Sequential Stream
long startTime = System.currentTimeMillis();
numbers.stream().forEach(ParallelStreamExample::process);
long endTime = System.currentTimeMillis();
System.out.println("Sequential Stream Time: " + (endTime - startTime) + " ms");
// Parallel Stream
startTime = System.currentTimeMillis();
numbers.parallelStream().forEach(ParallelStreamExample::process);
endTime = System.currentTimeMillis();
System.out.println("Parallel Stream Time: " + (endTime - startTime) + " ms");
}
private static void process(Integer number) {
try {
Thread.sleep(1); // প্রতিটি আইটেম প্রক্রিয়া করতে সময় নেয়
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
আউটপুট:
Sequential Stream Time: 10000 ms
Parallel Stream Time: ~2000 ms
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ParallelStreamFilterExample {
public static void main(String[] args) {
List<Integer> numbers = IntStream.range(1, 10001).boxed().toList();
// Parallel Stream ব্যবহার করে ফিল্টার ও ম্যাপ করা
List<Integer> evenNumbers = numbers.parallelStream()
.filter(n -> n % 2 == 0) // শুধুমাত্র জোড় সংখ্যা ফিল্টার
.map(n -> n * n) // প্রতিটি সংখ্যা স্কয়ার
.collect(Collectors.toList());
System.out.println("Even Numbers Count: " + evenNumbers.size());
}
}
যদি স্ট্রিমে কোনো সাইড এফেক্ট থাকে (যেমন ডেটা মডিফিকেশন), তাহলে Parallel Streams ব্যবহার না করাই ভালো।
// ভুল ব্যবহার: সাইড এফেক্ট সৃষ্টি করে
List<Integer> numbers = IntStream.range(1, 10001).boxed().toList();
List<Integer> results = new ArrayList<>();
numbers.parallelStream().forEach(n -> results.add(n * n)); // ConcurrentModificationException হতে পারে
Parallel Streams শুধুমাত্র বড় ডেটাসেটের জন্য উপযোগী। ছোট ডেটাসেটের জন্য sequential()
ব্যবহার করুন।
প্যারামিটার | Sequential Streams | Parallel Streams |
---|---|---|
প্রসেসিং মডেল | একক থ্রেড ব্যবহার করে। | একাধিক থ্রেড ব্যবহার করে। |
পারফরম্যান্স | ছোট ডেটাসেটের জন্য কার্যকর। | বড় ডেটাসেটের জন্য কার্যকর। |
সিনট্যাক্স | stream() ব্যবহার করে। | parallelStream() ব্যবহার করে। |
থ্রেড কন্ট্রোল | কোনো থ্রেড ব্যবস্থাপনা নেই। | JVM স্বয়ংক্রিয়ভাবে থ্রেড নিয়ন্ত্রণ করে। |
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;
public class CustomParallelStreamExample {
public static void main(String[] args) {
List<Integer> numbers = IntStream.range(1, 10001).boxed().toList();
ForkJoinPool customThreadPool = new ForkJoinPool(4); // কাস্টম থ্রেড পুল তৈরি
try {
customThreadPool.submit(() -> {
numbers.parallelStream().forEach(CustomParallelStreamExample::process);
}).get();
} catch (Exception e) {
e.printStackTrace();
} finally {
customThreadPool.shutdown();
}
}
private static void process(Integer number) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
এই পদ্ধতি সঠিকভাবে ব্যবহার করলে ডেটা প্রসেসিং অনেক বেশি কার্যকর এবং দ্রুত করা সম্ভব।
জাভার Stream API ডেটা প্রসেসিংয়ের জন্য একটি শক্তিশালী টুল যা Sequential এবং Parallel Streams এ কাজ করতে পারে। উভয়ই ডেটা প্রসেসিংয়ে কার্যকর হলেও, তাদের ব্যবহার এবং পারফরম্যান্সে বড় পার্থক্য রয়েছে।
Sequential Stream ডেটা প্রসেসিং ধাপে ধাপে সম্পন্ন করে, যেখানে প্রতিটি ধাপ পরবর্তী ধাপ শুরু হওয়ার আগে শেষ হয়। এটি সিঙ্গেল থ্রেড ব্যবহার করে।
import java.util.stream.IntStream;
public class SequentialStreamExample {
public static void main(String[] args) {
System.out.println("Sequential Stream:");
IntStream.range(1, 10)
.forEach(i -> System.out.println(Thread.currentThread().getName() + " : " + i));
}
}
আউটপুট (সম্ভাব্য):
Sequential Stream:
main : 1
main : 2
main : 3
...
main : 9
ব্যাখ্যা: main
থ্রেড Sequential Stream এর সব কাজ সম্পন্ন করে।
Parallel Stream ডেটা প্রসেসিং মাল্টিপল থ্রেডে বিভক্ত করে। এটি ForkJoinPool ব্যবহার করে Parallel Processing পরিচালনা করে।
import java.util.stream.IntStream;
public class ParallelStreamExample {
public static void main(String[] args) {
System.out.println("Parallel Stream:");
IntStream.range(1, 10)
.parallel()
.forEach(i -> System.out.println(Thread.currentThread().getName() + " : " + i));
}
}
আউটপুট (সম্ভাব্য):
Parallel Stream:
ForkJoinPool.commonPool-worker-3 : 1
main : 2
ForkJoinPool.commonPool-worker-5 : 3
...
ব্যাখ্যা: একাধিক থ্রেড ব্যবহার করে কাজ দ্রুত সম্পন্ন হয়।
বৈশিষ্ট্য | Sequential Stream | Parallel Stream |
---|---|---|
থ্রেড ব্যবহারে কৌশল | সিঙ্গল থ্রেড | মাল্টিপল থ্রেড |
অর্ডার মেইন্টেন | উৎস ক্রম অনুযায়ী ডেটা প্রসেস করে | সবসময় উৎস ক্রম মেইন্টেন করে না |
পারফরম্যান্স | ছোট ডেটাসেটের জন্য কার্যকর | বড় ডেটাসেটের জন্য কার্যকর |
কোড সিম্প্লিসিটি | সহজ এবং ডিবাগিং সহজ | ডিবাগিং কিছুটা কঠিন |
উদ্দেশ্য | ছোট ডেটাসেট এবং সিঙ্গেল-থ্রেড প্রসেসিং | বড় ডেটাসেট এবং মাল্টি-থ্রেড প্রসেসিং |
import java.util.stream.LongStream;
public class StreamPerformanceComparison {
public static void main(String[] args) {
long n = 10_000_000;
// Sequential Stream
long startTime = System.currentTimeMillis();
long sequentialSum = LongStream.rangeClosed(1, n).sum();
long endTime = System.currentTimeMillis();
System.out.println("Sequential Stream Time: " + (endTime - startTime) + " ms");
// Parallel Stream
startTime = System.currentTimeMillis();
long parallelSum = LongStream.rangeClosed(1, n).parallel().sum();
endTime = System.currentTimeMillis();
System.out.println("Parallel Stream Time: " + (endTime - startTime) + " ms");
}
}
Sequential Stream Time: 120 ms
Parallel Stream Time: 45 ms
বিশ্লেষণ:
সঠিক অর্ডার প্রয়োজন হলে Parallel Stream এ forEachOrdered()
ব্যবহার করুন।
IntStream.range(1, 10)
.parallel()
.forEachOrdered(System.out::println);
Stateful Lambda Expression
List<Integer> list = new ArrayList<>();
IntStream.range(1, 100).parallel().forEach(list::add); // Unsafe!
স্ট্রিম টাইপ | সেরা ব্যবহারের পরিস্থিতি |
---|---|
Sequential | ছোট ডেটাসেট, ডিবাগিং সহজ, এবং উৎস ক্রম প্রয়োজন। |
Parallel | বড় ডেটাসেট এবং মাল্টি-থ্রেড প্রসেসিং যেখানে পারফরম্যান্স গুরুত্বপূর্ণ। |
Sequential
এবং Parallel
Streams সঠিকভাবে ব্যবহার করলে ডেটা প্রসেসিং আরও কার্যকর এবং কনকারেন্ট অ্যাপ্লিকেশন তৈরি সহজ হয়।
Read more