Producer-Consumer Pattern মাল্টিথ্রেডেড প্রোগ্রামিংয়ে একটি সাধারণ প্যাটার্ন যেখানে একটি থ্রেড প্রযোজক (Producer) হিসাবে কাজ করে এবং ডেটা তৈরি করে, এবং অন্য থ্রেড ভোক্তা (Consumer) হিসাবে কাজ করে এবং ডেটা প্রক্রিয়া করে। এই দুই থ্রেডের মধ্যে ডেটা শেয়ার করার জন্য একটি থ্রেড-সেফ কিউ ব্যবহৃত হয়।
BlockingQueue এই প্যাটার্ন বাস্তবায়নের জন্য একটি জনপ্রিয় ডেটা স্ট্রাকচার কারণ এটি স্বয়ংক্রিয়ভাবে থ্রেড সিঙ্ক্রোনাইজেশন পরিচালনা করে।
BlockingQueue কীভাবে কাজ করে?
put(E e): প্রযোজক একটি আইটেম কিউতে যোগ করে। যদি কিউ পূর্ণ থাকে, এটি ব্লক করে।take(): ভোক্তা কিউ থেকে একটি আইটেম গ্রহণ করে। যদি কিউ খালি থাকে, এটি ব্লক করে।- ইমপ্লিমেন্টেশন:
ArrayBlockingQueue: একটি নির্দিষ্ট আকারের ব্লকিং কিউ।LinkedBlockingQueue: ডায়নামিক আকারের কিউ যা লিংকড লিস্ট দিয়ে তৈরি।
Producer-Consumer Pattern এর উদাহরণ
কোড উদাহরণ: BlockingQueue ব্যবহার করে Producer-Consumer
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerExample {
public static void main(String[] args) {
// BlockingQueue এর একটি উদাহরণ তৈরি করুন
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
// প্রযোজক থ্রেড
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
System.out.println("Producer produced: " + i);
queue.put(i); // আইটেম যোগ করা
Thread.sleep(500); // কিছু সময় বিরতি
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// ভোক্তা থ্রেড
Thread consumer = new Thread(() -> {
try {
while (true) {
Integer item = queue.take(); // আইটেম গ্রহণ করা
System.out.println("Consumer consumed: " + item);
Thread.sleep(1000); // কিছু সময় বিরতি
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// থ্রেড চালু করা
producer.start();
consumer.start();
}
}
কোড বিশ্লেষণ
- BlockingQueue ব্যবহার:
ArrayBlockingQueueব্যবহার করা হয়েছে, যা ৫টি উপাদান পর্যন্ত সংরক্ষণ করতে পারে। - Producer Logic:
- আইটেম তৈরি করে
put()ব্যবহার করে কিউতে যোগ করে। - কিউ পূর্ণ হলে থ্রেড ব্লক করে এবং অপেক্ষা করে।
- আইটেম তৈরি করে
- Consumer Logic:
- কিউ থেকে আইটেম গ্রহণ করে
take()ব্যবহার করে। - কিউ খালি হলে থ্রেড ব্লক করে এবং অপেক্ষা করে।
- কিউ থেকে আইটেম গ্রহণ করে
- Synchronization:
BlockingQueueস্বয়ংক্রিয়ভাবে থ্রেড সিঙ্ক্রোনাইজেশন পরিচালনা করে, তাই আলাদাভাবে লকিং দরকার হয় না।
আরও উন্নত উদাহরণ: প্রযোজক-ভোক্তা একটি নির্দিষ্ট সময়ে বন্ধ হবে
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerWithTermination {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
boolean[] stopFlag = {false}; // থ্রেড বন্ধ করার জন্য একটি ফ্ল্যাগ
// প্রযোজক থ্রেড
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
System.out.println("Producer produced: " + i);
queue.put(i);
Thread.sleep(500);
}
stopFlag[0] = true; // উৎপাদন শেষ
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// ভোক্তা থ্রেড
Thread consumer = new Thread(() -> {
try {
while (!stopFlag[0] || !queue.isEmpty()) {
Integer item = queue.take();
System.out.println("Consumer consumed: " + item);
Thread.sleep(1000);
}
System.out.println("Consumer exiting as no more items.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
উপকারিতা
- লক-ফ্রি অপারেশন:
BlockingQueueলকিং ছাড়াই থ্রেড সিঙ্ক্রোনাইজেশন পরিচালনা করে। - সহজ ইন্টারফেস:
put()এবংtake()সরাসরি থ্রেডের কাজ পরিচালনা করে। - মাল্টিপ্রোডিউসার-মাল্টিকনজিউমার: একাধিক প্রযোজক ও ভোক্তা সহজে যুক্ত করা যায়।
প্র্যাকটিকাল ব্যবহারের ক্ষেত্র
- লগ প্রসেসিং সিস্টেম: লগ প্রযোজক ডেটা তৈরি করে এবং প্রসেসিং থ্রেড তা প্রক্রিয়াজাত করে।
- টাস্ক প্রসেসিং: ওয়ার্কার থ্রেড পুল ব্যবহার করে কাজ ভাগ করা।
- রিয়েল-টাইম ডেটা স্ট্রিম: ডেটা ক্যাপচার (Producer) এবং অ্যানালিটিকস প্রসেসিং (Consumer)।
BlockingQueue এবং Producer-Consumer Pattern মাল্টিথ্রেডেড প্রোগ্রামিং সহজ এবং কার্যকর করে। এটি ডেটা ভাগাভাগি এবং থ্রেড সিঙ্ক্রোনাইজেশন নিশ্চিত করতে ব্যবহৃত হয়। সঠিক ব্যবহার নিশ্চিত করলে এটি উচ্চ কার্যক্ষম এবং নির্ভরযোগ্য সমাধান প্রদান করে।
BlockingQueue হল একটি থ্রেড-সেফ কিউ, যা প্রযোজক-গ্রাহক (Producer-Consumer) প্যাটার্নে ব্যবহৃত হয়। এটি জাভার java.util.concurrent প্যাকেজের অংশ এবং থ্রেড সিঙ্ক্রোনাইজেশনের জন্য ব্লকিং অপারেশন প্রদান করে।
BlockingQueue এর বৈশিষ্ট্য
- থ্রেড-সেফ: একাধিক থ্রেড একসাথে একই কিউ ব্যবহার করতে পারে।
- ব্লকিং মেকানিজম:
- যদি কিউ খালি থাকে,
take()মেথড থ্রেডকে ব্লক করে রাখে যতক্ষণ না কোনো আইটেম যোগ করা হয়। - যদি কিউ পূর্ণ থাকে,
put()মেথড থ্রেডকে ব্লক করে রাখে যতক্ষণ না কোনো স্থান খালি হয়।
- যদি কিউ খালি থাকে,
- ইমপ্লিমেন্টেশন: এর অনেক ইমপ্লিমেন্টেশন রয়েছে:
ArrayBlockingQueue: নির্দিষ্ট আকারের কিউ।LinkedBlockingQueue: লিংকড লিস্ট ব্যবহার করে।PriorityBlockingQueue: অর্ডার্ড উপাদান সংরক্ষণ করে।DelayQueue: ডিলেই অপারেশনের জন্য ব্যবহৃত হয়।
BlockingQueue এর মেথডসমূহ
put(E e): কিউতে একটি উপাদান যোগ করে; যদি কিউ পূর্ণ হয়, এটি ব্লক করে।take(): কিউ থেকে একটি উপাদান নেয়; যদি কিউ খালি হয়, এটি ব্লক করে।offer(E e): একটি উপাদান যোগ করার চেষ্টা করে; তাৎক্ষণিক ব্যর্থ হলেfalseরিটার্ন করে।poll(): একটি উপাদান নেওয়ার চেষ্টা করে; কিউ খালি থাকলেnullরিটার্ন করে।
BlockingQueue এর ব্যবহার: প্রযোজক-গ্রাহক উদাহরণ
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerExample {
public static void main(String[] args) {
// ৫টি উপাদানের সীমা সহ একটি BlockingQueue তৈরি করা
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
// প্রযোজক থ্রেড
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
queue.put(i); // কিউতে মান যোগ করা
System.out.println("Produced: " + i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// গ্রাহক থ্রেড
Thread consumer = new Thread(() -> {
try {
while (true) {
Integer value = queue.take(); // কিউ থেকে মান নেওয়া
System.out.println("Consumed: " + value);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producer.start();
consumer.start();
}
}
BlockingQueue এর ব্যবহার: ফিক্সড থ্রেড পুল সহ উদাহরণ
import java.util.concurrent.*;
public class ThreadPoolWithBlockingQueue {
public static void main(String[] args) {
BlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<>(10);
// ThreadPoolExecutor তৈরি করা
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // কোর থ্রেড
4, // সর্বাধিক থ্রেড
60, // আইডল থ্রেডের জন্য অপেক্ষার সময়
TimeUnit.SECONDS,
taskQueue
);
// টাস্ক যোগ করা
for (int i = 1; i <= 15; i++) {
int taskNumber = i;
executor.execute(() -> {
System.out.println("Executing Task: " + taskNumber);
try {
Thread.sleep(1000); // কাজ সম্পন্ন করা
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}
BlockingQueue এর বিভিন্ন ইমপ্লিমেন্টেশন
১. ArrayBlockingQueue
- নির্দিষ্ট আকারের।
- প্রথমে ইনসার্ট, প্রথমে রিমুভ (FIFO)।
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
২. LinkedBlockingQueue
- ডায়নামিক আকারের।
- বড় ডেটা সেটের জন্য উপযুক্ত।
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
৩. PriorityBlockingQueue
- উপাদানগুলিকে অর্ডার করে রাখে।
Comparatorব্যবহার করা যায়।
BlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
৪. DelayQueue
- উপাদানগুলোকে ডিলেই সময় পর্যন্ত হোল্ড করে।
Delayedইন্টারফেস ব্যবহার করা হয়।
BlockingQueue<DelayedTask> queue = new DelayQueue<>();
BlockingQueue এর সুবিধা
- থ্রেড-সেফ: একাধিক থ্রেড একসাথে কাজ করতে পারে।
- ব্লকিং মেকানিজম: ব্যস্ত অপেক্ষা (busy-waiting) প্রতিরোধ করে।
- মাল্টি-থ্রেড প্রোগ্রামিংয়ে সহজ: প্রযোজক-গ্রাহক প্যাটার্ন সহজ করে।
BlockingQueue এর সীমাবদ্ধতা
- ফুল কিউ: যদি কিউ পূর্ণ হয়, তখন
put()থ্রেডকে ব্লক করে। - খালি কিউ: যদি কিউ খালি থাকে, তখন
take()থ্রেডকে ব্লক করে। - ডিলেইড প্রসেসিং: বড় ডেটা সেটের জন্য অপেক্ষা সময় বাড়তে পারে।
BlockingQueue হল জাভার কনকারেন্সি মডেলের একটি অত্যন্ত কার্যকর ডেটা স্ট্রাকচার, যা থ্রেড-সেফ এবং ব্লকিং মেকানিজম সমর্থন করে। এটি প্রযোজক-গ্রাহক প্যাটার্ন, থ্রেড পুল, এবং ডেলেইড প্রসেসিং এর মত কাজ সহজ করে। সঠিক ইমপ্লিমেন্টেশন এবং অপারেশন চয়নের মাধ্যমে এটি মাল্টি-থ্রেডেড প্রোগ্রামিংকে আরও কার্যকর ও উন্নত করে।
Producer-Consumer Problem একটি ক্লাসিক কনকারেন্সি সমস্যা, যেখানে একটি প্রযোজক (Producer) ডেটা তৈরি করে এবং একটি গ্রাহক (Consumer) সেই ডেটা ব্যবহার করে। এই সমস্যা সমাধানের জন্য প্রযোজক এবং গ্রাহকের মধ্যে সিঙ্ক্রোনাইজেশন দরকার যাতে ডেটা লস বা ডেডলক না ঘটে।
সমাধান কৌশল
- BlockingQueue ব্যবহার করে: জাভার
BlockingQueueক্লাস এই সমস্যার জন্য সহজ ও কার্যকর সমাধান প্রদান করে। - wait()-notify() মেথড ব্যবহার করে: ম্যানুয়ালি থ্রেড সিঙ্ক্রোনাইজেশনের মাধ্যমে সমস্যার সমাধান করা।
- Locks এবং Conditions ব্যবহার করে: লক এবং কন্ডিশন ভেরিয়েবল ব্যবহার করে আরও কাস্টমাইজড সমাধান।
BlockingQueue ব্যবহার করে সমাধান
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerWithBlockingQueue {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5); // কিউ-এর আকার ৫
// প্রযোজক থ্রেড
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
queue.put(i); // কিউতে ডেটা যোগ করা
System.out.println("Produced: " + i);
Thread.sleep(500); // স্লিপ করে ধীরে উৎপাদন
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// গ্রাহক থ্রেড
Thread consumer = new Thread(() -> {
try {
while (true) {
int value = queue.take(); // কিউ থেকে ডেটা পড়া
System.out.println("Consumed: " + value);
Thread.sleep(1000); // স্লিপ করে ধীরে গ্রাহন
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
কোডের ব্যাখ্যা
put()এবংtake():BlockingQueueস্বয়ংক্রিয়ভাবে ব্লক করে যদি কিউ পূর্ণ বা খালি থাকে।- সিঙ্ক্রোনাইজেশন: থ্রেড সেফটি নিশ্চিত করতে কোনও অতিরিক্ত লক বা
wait-notifyপ্রয়োজন নেই।
wait()-notify() ব্যবহার করে সমাধান
import java.util.LinkedList;
class SharedResource {
private final LinkedList<Integer> list = new LinkedList<>();
private final int CAPACITY = 5;
public synchronized void produce(int value) throws InterruptedException {
while (list.size() == CAPACITY) {
wait(); // কিউ পূর্ণ হলে অপেক্ষা
}
list.add(value);
System.out.println("Produced: " + value);
notify(); // গ্রাহককে জাগ্রত করা
}
public synchronized int consume() throws InterruptedException {
while (list.isEmpty()) {
wait(); // কিউ খালি হলে অপেক্ষা
}
int value = list.removeFirst();
System.out.println("Consumed: " + value);
notify(); // প্রযোজককে জাগ্রত করা
return value;
}
}
public class ProducerConsumerWithWaitNotify {
public static void main(String[] args) {
SharedResource sharedResource = new SharedResource();
// প্রযোজক থ্রেড
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
sharedResource.produce(i);
Thread.sleep(500);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// গ্রাহক থ্রেড
Thread consumer = new Thread(() -> {
try {
while (true) {
sharedResource.consume();
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
কোডের ব্যাখ্যা
wait()এবংnotify():- যখন কিউ পূর্ণ থাকে, প্রযোজক থ্রেড
wait()করে। - যখন কিউ খালি থাকে, গ্রাহক থ্রেড
wait()করে। notify()ব্যবহার করে অপেক্ষমান থ্রেডকে জাগ্রত করা হয়।
- যখন কিউ পূর্ণ থাকে, প্রযোজক থ্রেড
- সিঙ্ক্রোনাইজড ব্লক: মাল্টিথ্রেডেড এক্সেসকে সুরক্ষিত করে।
Locks এবং Conditions ব্যবহার করে সমাধান
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class SharedResourceWithLock {
private final LinkedList<Integer> list = new LinkedList<>();
private final int CAPACITY = 5;
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public void produce(int value) throws InterruptedException {
lock.lock();
try {
while (list.size() == CAPACITY) {
notFull.await(); // কিউ পূর্ণ হলে অপেক্ষা
}
list.add(value);
System.out.println("Produced: " + value);
notEmpty.signal(); // গ্রাহককে জাগ্রত করা
} finally {
lock.unlock();
}
}
public int consume() throws InterruptedException {
lock.lock();
try {
while (list.isEmpty()) {
notEmpty.await(); // কিউ খালি হলে অপেক্ষা
}
int value = list.removeFirst();
System.out.println("Consumed: " + value);
notFull.signal(); // প্রযোজককে জাগ্রত করা
return value;
} finally {
lock.unlock();
}
}
}
public class ProducerConsumerWithLocks {
public static void main(String[] args) {
SharedResourceWithLock sharedResource = new SharedResourceWithLock();
// প্রযোজক থ্রেড
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
sharedResource.produce(i);
Thread.sleep(500);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// গ্রাহক থ্রেড
Thread consumer = new Thread(() -> {
try {
while (true) {
sharedResource.consume();
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
কোডের ব্যাখ্যা
LockএবংCondition:notFull.await()এবংnotEmpty.await()নির্দিষ্ট কন্ডিশনে থ্রেডকে অপেক্ষা করায়।notFull.signal()এবংnotEmpty.signal()ব্যবহার করে অপেক্ষমান থ্রেডকে জাগ্রত করা হয়।
- উন্নত নিয়ন্ত্রণ:
Conditionব্যবহারে নির্দিষ্ট কন্ডিশনের উপর ভিত্তি করে সিঙ্ক্রোনাইজেশন সহজ হয়।
- BlockingQueue: সরল এবং কার্যকর।
BlockingQueueব্যবহার করা সর্বাধিক সুপারিশকৃত সমাধান। - wait()-notify(): বেশি কাস্টমাইজড এবং ম্যানুয়াল কনট্রোলের জন্য উপযোগী।
- Locks এবং Conditions: জটিল সমস্যার জন্য যেখানে স্পষ্ট এবং নির্দিষ্ট নিয়ন্ত্রণ প্রয়োজন।
Producer-Consumer সমস্যা সমাধানে এই পদ্ধতিগুলি জাভার কনকারেন্সি ফ্রেমওয়ার্কের শক্তি এবং নমনীয়তা প্রমাণ করে।
ArrayBlockingQueue এবং LinkedBlockingQueue হল জাভার কনকারেন্সি ফ্রেমওয়ার্কে BlockingQueue ইন্টারফেসের দুটি গুরুত্বপূর্ণ ইমপ্লিমেন্টেশন। এগুলো মূলত প্রযোজক-গ্রাহক (Producer-Consumer) প্যাটার্নে ব্যবহৃত হয় এবং থ্রেড সিঙ্ক্রোনাইজেশন নিশ্চিত করে।
১. ArrayBlockingQueue
ArrayBlockingQueue হলো একটি ফিক্সড সাইজের ব্লকিং কিউ, যেখানে একটি নির্দিষ্ট আকারের অ্যারে ব্যবহার করা হয়। এটি FIFO (First-In-First-Out) অর্ডারে কাজ করে।
বৈশিষ্ট্য:
- পূর্বনির্ধারিত আকার (capacity)।
- একসাথে একাধিক থ্রেড দ্বারা ব্যবহৃত হতে পারে।
- থ্রেড-সেফ, যেখানে প্রযোজক এবং গ্রাহক লকিং মেকানিজম ব্যবহার করে।
উদাহরণ:
import java.util.concurrent.ArrayBlockingQueue;
public class ArrayBlockingQueueExample {
public static void main(String[] args) {
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5); // আকার 5
// প্রযোজক থ্রেড
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
queue.put(i); // কিউতে মান যোগ করা
System.out.println("Produced: " + i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// গ্রাহক থ্রেড
Thread consumer = new Thread(() -> {
try {
while (true) {
Integer value = queue.take(); // কিউ থেকে মান নেওয়া
System.out.println("Consumed: " + value);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producer.start();
consumer.start();
}
}
২. LinkedBlockingQueue
LinkedBlockingQueue হলো একটি ডাইনামিক সাইজের ব্লকিং কিউ, যা লিঙ্কড লিস্ট ব্যবহার করে তৈরি করা হয়। এটি FIFO অর্ডারে কাজ করে এবং বড় আকারের ডেটা পরিচালনার জন্য উপযুক্ত।
বৈশিষ্ট্য:
- ফিক্সড বা আনলিমিটেড সাইজ হতে পারে।
- প্রযোজক এবং গ্রাহকের জন্য পৃথক লক ব্যবহার করে, ফলে লক প্রতিযোগিতা (lock contention) কম হয়।
- বড় ডেটাসেট পরিচালনার জন্য কার্যকর।
উদাহরণ:
import java.util.concurrent.LinkedBlockingQueue;
public class LinkedBlockingQueueExample {
public static void main(String[] args) {
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(3); // ফিক্সড আকার 3
// প্রযোজক থ্রেড
Thread producer = new Thread(() -> {
try {
queue.put("One");
System.out.println("Produced: One");
queue.put("Two");
System.out.println("Produced: Two");
queue.put("Three");
System.out.println("Produced: Three");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// গ্রাহক থ্রেড
Thread consumer = new Thread(() -> {
try {
while (true) {
String value = queue.take(); // কিউ থেকে মান নেওয়া
System.out.println("Consumed: " + value);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producer.start();
consumer.start();
}
}
৩. ArrayBlockingQueue এবং LinkedBlockingQueue এর তুলনা
| বৈশিষ্ট্য | ArrayBlockingQueue | LinkedBlockingQueue |
|---|---|---|
| মেমোরি স্ট্রাকচার | ফিক্সড সাইজ অ্যারে | লিঙ্কড লিস্ট |
| ডেটা সাইজ | পূর্বনির্ধারিত (capacity বাধ্যতামূলক) | ফিক্সড বা আনলিমিটেড |
| লক ব্যবস্থাপনা | একক লক | পৃথক লক (প্রযোজক ও গ্রাহকের জন্য আলাদা) |
| পারফরম্যান্স | ছোট ডেটাসেটের জন্য কার্যকর | বড় ডেটাসেটের জন্য কার্যকর |
| ব্যবহার ক্ষেত্র | সীমিত ডেটা পরিচালনা | বড় বা ডাইনামিক ডেটাসেট |
৪. দুইটি একত্রে ব্যবহার: উদাহরণ
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class CombinedBlockingQueueExample {
public static void main(String[] args) {
ArrayBlockingQueue<Integer> arrayQueue = new ArrayBlockingQueue<>(3);
LinkedBlockingQueue<String> linkedQueue = new LinkedBlockingQueue<>();
// প্রযোজক থ্রেড (ArrayBlockingQueue)
Thread arrayProducer = new Thread(() -> {
try {
for (int i = 1; i <= 3; i++) {
arrayQueue.put(i);
System.out.println("ArrayQueue Produced: " + i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// গ্রাহক থ্রেড (ArrayBlockingQueue)
Thread arrayConsumer = new Thread(() -> {
try {
while (!arrayQueue.isEmpty()) {
Integer value = arrayQueue.take();
System.out.println("ArrayQueue Consumed: " + value);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// প্রযোজক থ্রেড (LinkedBlockingQueue)
Thread linkedProducer = new Thread(() -> {
try {
linkedQueue.put("One");
linkedQueue.put("Two");
linkedQueue.put("Three");
System.out.println("LinkedQueue Produced: Three items");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// গ্রাহক থ্রেড (LinkedBlockingQueue)
Thread linkedConsumer = new Thread(() -> {
try {
while (!linkedQueue.isEmpty()) {
String value = linkedQueue.take();
System.out.println("LinkedQueue Consumed: " + value);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// থ্রেড চালু করা
arrayProducer.start();
arrayConsumer.start();
linkedProducer.start();
linkedConsumer.start();
}
}
ArrayBlockingQueue: ছোট এবং ফিক্সড সাইজের ডেটার জন্য কার্যকর।LinkedBlockingQueue: বড় এবং ডাইনামিক সাইজের ডেটার জন্য উপযুক্ত।- উভয়ই প্রযোজক-গ্রাহক প্যাটার্নে থ্রেড সিঙ্ক্রোনাইজেশনের জন্য কার্যকর এবং থ্রেড-সেফ।
উপযুক্ত প্রয়োজন অনুযায়ী এই দুটি কিউ ব্যবহার করে মাল্টিথ্রেডেড প্রোগ্রামিং সহজ ও দক্ষ করা যায়।
জাভা কনকারেন্সি হলো মাল্টিথ্রেডিং এবং সিঙ্ক্রোনাইজড প্রোগ্রামিংয়ের জন্য একটি শক্তিশালী প্যাকেজ (java.util.concurrent)। এটি ডেটা শেয়ারিং এবং থ্রেডের মধ্যে সমন্বয় পরিচালনার জন্য কার্যকরী টুলস প্রদান করে।
DelayQueue
DelayQueue কী?
DelayQueueহলো একটি ব্লকিং কিউ যাDelayedইন্টারফেস ইমপ্লিমেন্ট করা অবজেক্ট রাখে।- অবজেক্টগুলো নির্দিষ্ট সময় পর কিউ থেকে তোলা যায়।
- এটি টাইম-সেন্সিটিভ টাস্ক প্রসেসিংয়ের জন্য আদর্শ।
ব্যবহার:
- Task Scheduling: নির্দিষ্ট সময় পরে কোনো টাস্ক চালানোর জন্য।
- Caching: অবজেক্টের টাইম টু লাইভ (TTL) ম্যানেজ করতে।
উদাহরণ:
import java.util.concurrent.*;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
class DelayedTask implements Delayed {
private final String name;
private final long startTime;
public DelayedTask(String name, long delayInMillis) {
this.name = name;
this.startTime = System.currentTimeMillis() + delayInMillis;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(startTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
return "Task{name='" + name + "'}";
}
}
public class DelayQueueExample {
public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayedTask> queue = new DelayQueue<>();
// Adding tasks
queue.put(new DelayedTask("Task1", 2000));
queue.put(new DelayedTask("Task2", 4000));
queue.put(new DelayedTask("Task3", 1000));
// Processing tasks
while (!queue.isEmpty()) {
DelayedTask task = queue.take(); // Waits until the delay expires
System.out.println("Processed: " + task);
}
}
}
PriorityBlockingQueue
PriorityBlockingQueue কী?
PriorityBlockingQueueহলো একটি থ্রেড-সেফ ব্লকিং কিউ যা এলিমেন্টগুলোকে তাদের প্রায়োরিটির ভিত্তিতে সাজায়।Comparableইন্টারফেস বাComparatorএর মাধ্যমে প্রায়োরিটি নির্ধারণ করা হয়।
ব্যবহার:
- Task Priority Management: উচ্চ-প্রায়োরিটির টাস্ক আগে প্রসেস করতে।
- Job Scheduling: মাল্টি-জব ম্যানেজমেন্ট সিস্টেমে।
উদাহরণ:
import java.util.concurrent.*;
public class PriorityBlockingQueueExample {
public static void main(String[] args) throws InterruptedException {
PriorityBlockingQueue<String> queue = new PriorityBlockingQueue<>();
// Adding elements
queue.put("Low Priority");
queue.put("High Priority");
queue.put("Medium Priority");
// Retrieving elements (Natural Order or Comparable)
while (!queue.isEmpty()) {
System.out.println("Processed: " + queue.take());
}
}
}
SynchronousQueue
SynchronousQueue কী?
SynchronousQueueএকটি বিশেষ ধরনের ব্লকিং কিউ, যেখানে কোনো স্টোরেজ ক্ষমতা নেই। প্রতিটিput()মেথডের জন্য একটিtake()কল থাকা আবশ্যক।- এটি direct handoff করার জন্য ব্যবহৃত হয়।
ব্যবহার:
- Thread Communication: প্রোডিউসার এবং কনজিউমারের মধ্যে সরাসরি ডেটা শেয়ার।
- Load Balancing: একাধিক থ্রেডের মধ্যে কাজ ভাগাভাগি করতে।
উদাহরণ:
import java.util.concurrent.*;
public class SynchronousQueueExample {
public static void main(String[] args) {
SynchronousQueue<String> queue = new SynchronousQueue<>();
// Producer Thread
new Thread(() -> {
try {
System.out.println("Putting: Task1");
queue.put("Task1");
System.out.println("Putting: Task2");
queue.put("Task2");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// Consumer Thread
new Thread(() -> {
try {
System.out.println("Taken: " + queue.take());
System.out.println("Taken: " + queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
কনকারেন্সি টুলসের তুলনা
| Queue Type | Use Case | Blocking Behavior |
|---|---|---|
DelayQueue | টাইম-বেজড টাস্ক শিডিউলিং। | ডেটা তখনই অ্যাক্সেসযোগ্য যখন টাইম এক্সপায়ার। |
PriorityBlockingQueue | প্রায়োরিটি বেসড ডেটা প্রসেসিং। | ব্লক হয় না; সর্বোচ্চ প্রায়োরিটি এলিমেন্ট আগে প্রসেস হয়। |
SynchronousQueue | প্রোডিউসার-কনজিউমার সরাসরি ডেটা শেয়ারিং। | প্রতিটি put() এর জন্য একটি take() প্রয়োজন। |
DelayQueueনির্দিষ্ট সময় পর টাস্ক প্রসেস করার জন্য।PriorityBlockingQueueপ্রায়োরিটি নির্ভর ডেটা প্রসেসিংয়ে।SynchronousQueueথ্রেডের মধ্যে ডেটা আদান-প্রদানে। জাভার কনকারেন্সি টুলস সঠিকভাবে ব্যবহার করলে মাল্টিথ্রেডেড অ্যাপ্লিকেশন আরও কার্যকর এবং নির্ভরযোগ্য হয়।
Read more