BlockingQueue এবং Producer-Consumer Pattern

জাভা কনকারেন্সি (Java Concurrency) - Java Technologies

271

Producer-Consumer Pattern মাল্টিথ্রেডেড প্রোগ্রামিংয়ে একটি সাধারণ প্যাটার্ন যেখানে একটি থ্রেড প্রযোজক (Producer) হিসাবে কাজ করে এবং ডেটা তৈরি করে, এবং অন্য থ্রেড ভোক্তা (Consumer) হিসাবে কাজ করে এবং ডেটা প্রক্রিয়া করে। এই দুই থ্রেডের মধ্যে ডেটা শেয়ার করার জন্য একটি থ্রেড-সেফ কিউ ব্যবহৃত হয়।

BlockingQueue এই প্যাটার্ন বাস্তবায়নের জন্য একটি জনপ্রিয় ডেটা স্ট্রাকচার কারণ এটি স্বয়ংক্রিয়ভাবে থ্রেড সিঙ্ক্রোনাইজেশন পরিচালনা করে।


BlockingQueue কীভাবে কাজ করে?

  1. put(E e): প্রযোজক একটি আইটেম কিউতে যোগ করে। যদি কিউ পূর্ণ থাকে, এটি ব্লক করে।
  2. take(): ভোক্তা কিউ থেকে একটি আইটেম গ্রহণ করে। যদি কিউ খালি থাকে, এটি ব্লক করে।
  3. ইমপ্লিমেন্টেশন:
    • ArrayBlockingQueue: একটি নির্দিষ্ট আকারের ব্লকিং কিউ।
    • LinkedBlockingQueue: ডায়নামিক আকারের কিউ যা লিংকড লিস্ট দিয়ে তৈরি।

Producer-Consumer Pattern এর উদাহরণ

কোড উদাহরণ: BlockingQueue ব্যবহার করে Producer-Consumer

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ProducerConsumerExample {

    public static void main(String[] args) {
        // BlockingQueue এর একটি উদাহরণ তৈরি করুন
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);

        // প্রযোজক থ্রেড
        Thread producer = new Thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    System.out.println("Producer produced: " + i);
                    queue.put(i); // আইটেম যোগ করা
                    Thread.sleep(500); // কিছু সময় বিরতি
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // ভোক্তা থ্রেড
        Thread consumer = new Thread(() -> {
            try {
                while (true) {
                    Integer item = queue.take(); // আইটেম গ্রহণ করা
                    System.out.println("Consumer consumed: " + item);
                    Thread.sleep(1000); // কিছু সময় বিরতি
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // থ্রেড চালু করা
        producer.start();
        consumer.start();
    }
}

কোড বিশ্লেষণ

  1. BlockingQueue ব্যবহার: ArrayBlockingQueue ব্যবহার করা হয়েছে, যা ৫টি উপাদান পর্যন্ত সংরক্ষণ করতে পারে।
  2. Producer Logic:
    • আইটেম তৈরি করে put() ব্যবহার করে কিউতে যোগ করে।
    • কিউ পূর্ণ হলে থ্রেড ব্লক করে এবং অপেক্ষা করে।
  3. Consumer Logic:
    • কিউ থেকে আইটেম গ্রহণ করে take() ব্যবহার করে।
    • কিউ খালি হলে থ্রেড ব্লক করে এবং অপেক্ষা করে।
  4. Synchronization: BlockingQueue স্বয়ংক্রিয়ভাবে থ্রেড সিঙ্ক্রোনাইজেশন পরিচালনা করে, তাই আলাদাভাবে লকিং দরকার হয় না।

আরও উন্নত উদাহরণ: প্রযোজক-ভোক্তা একটি নির্দিষ্ট সময়ে বন্ধ হবে

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ProducerConsumerWithTermination {

    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
        boolean[] stopFlag = {false}; // থ্রেড বন্ধ করার জন্য একটি ফ্ল্যাগ

        // প্রযোজক থ্রেড
        Thread producer = new Thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    System.out.println("Producer produced: " + i);
                    queue.put(i);
                    Thread.sleep(500);
                }
                stopFlag[0] = true; // উৎপাদন শেষ
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // ভোক্তা থ্রেড
        Thread consumer = new Thread(() -> {
            try {
                while (!stopFlag[0] || !queue.isEmpty()) {
                    Integer item = queue.take();
                    System.out.println("Consumer consumed: " + item);
                    Thread.sleep(1000);
                }
                System.out.println("Consumer exiting as no more items.");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();
    }
}

উপকারিতা

  1. লক-ফ্রি অপারেশন: BlockingQueue লকিং ছাড়াই থ্রেড সিঙ্ক্রোনাইজেশন পরিচালনা করে।
  2. সহজ ইন্টারফেস: put() এবং take() সরাসরি থ্রেডের কাজ পরিচালনা করে।
  3. মাল্টিপ্রোডিউসার-মাল্টিকনজিউমার: একাধিক প্রযোজক ও ভোক্তা সহজে যুক্ত করা যায়।

প্র্যাকটিকাল ব্যবহারের ক্ষেত্র

  1. লগ প্রসেসিং সিস্টেম: লগ প্রযোজক ডেটা তৈরি করে এবং প্রসেসিং থ্রেড তা প্রক্রিয়াজাত করে।
  2. টাস্ক প্রসেসিং: ওয়ার্কার থ্রেড পুল ব্যবহার করে কাজ ভাগ করা।
  3. রিয়েল-টাইম ডেটা স্ট্রিম: ডেটা ক্যাপচার (Producer) এবং অ্যানালিটিকস প্রসেসিং (Consumer)।

BlockingQueue এবং Producer-Consumer Pattern মাল্টিথ্রেডেড প্রোগ্রামিং সহজ এবং কার্যকর করে। এটি ডেটা ভাগাভাগি এবং থ্রেড সিঙ্ক্রোনাইজেশন নিশ্চিত করতে ব্যবহৃত হয়। সঠিক ব্যবহার নিশ্চিত করলে এটি উচ্চ কার্যক্ষম এবং নির্ভরযোগ্য সমাধান প্রদান করে।

Content added By

BlockingQueue হল একটি থ্রেড-সেফ কিউ, যা প্রযোজক-গ্রাহক (Producer-Consumer) প্যাটার্নে ব্যবহৃত হয়। এটি জাভার java.util.concurrent প্যাকেজের অংশ এবং থ্রেড সিঙ্ক্রোনাইজেশনের জন্য ব্লকিং অপারেশন প্রদান করে।


BlockingQueue এর বৈশিষ্ট্য

  1. থ্রেড-সেফ: একাধিক থ্রেড একসাথে একই কিউ ব্যবহার করতে পারে।
  2. ব্লকিং মেকানিজম:
    • যদি কিউ খালি থাকে, take() মেথড থ্রেডকে ব্লক করে রাখে যতক্ষণ না কোনো আইটেম যোগ করা হয়।
    • যদি কিউ পূর্ণ থাকে, put() মেথড থ্রেডকে ব্লক করে রাখে যতক্ষণ না কোনো স্থান খালি হয়।
  3. ইমপ্লিমেন্টেশন: এর অনেক ইমপ্লিমেন্টেশন রয়েছে:
    • ArrayBlockingQueue: নির্দিষ্ট আকারের কিউ।
    • LinkedBlockingQueue: লিংকড লিস্ট ব্যবহার করে।
    • PriorityBlockingQueue: অর্ডার্ড উপাদান সংরক্ষণ করে।
    • DelayQueue: ডিলেই অপারেশনের জন্য ব্যবহৃত হয়।

BlockingQueue এর মেথডসমূহ

  1. put(E e): কিউতে একটি উপাদান যোগ করে; যদি কিউ পূর্ণ হয়, এটি ব্লক করে।
  2. take(): কিউ থেকে একটি উপাদান নেয়; যদি কিউ খালি হয়, এটি ব্লক করে।
  3. offer(E e): একটি উপাদান যোগ করার চেষ্টা করে; তাৎক্ষণিক ব্যর্থ হলে false রিটার্ন করে।
  4. poll(): একটি উপাদান নেওয়ার চেষ্টা করে; কিউ খালি থাকলে null রিটার্ন করে।

BlockingQueue এর ব্যবহার: প্রযোজক-গ্রাহক উদাহরণ

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ProducerConsumerExample {

    public static void main(String[] args) {
        // ৫টি উপাদানের সীমা সহ একটি BlockingQueue তৈরি করা
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);

        // প্রযোজক থ্রেড
        Thread producer = new Thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    queue.put(i); // কিউতে মান যোগ করা
                    System.out.println("Produced: " + i);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // গ্রাহক থ্রেড
        Thread consumer = new Thread(() -> {
            try {
                while (true) {
                    Integer value = queue.take(); // কিউ থেকে মান নেওয়া
                    System.out.println("Consumed: " + value);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        producer.start();
        consumer.start();
    }
}

BlockingQueue এর ব্যবহার: ফিক্সড থ্রেড পুল সহ উদাহরণ

import java.util.concurrent.*;

public class ThreadPoolWithBlockingQueue {

    public static void main(String[] args) {
        BlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<>(10);

        // ThreadPoolExecutor তৈরি করা
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2, // কোর থ্রেড
                4, // সর্বাধিক থ্রেড
                60, // আইডল থ্রেডের জন্য অপেক্ষার সময়
                TimeUnit.SECONDS,
                taskQueue
        );

        // টাস্ক যোগ করা
        for (int i = 1; i <= 15; i++) {
            int taskNumber = i;
            executor.execute(() -> {
                System.out.println("Executing Task: " + taskNumber);
                try {
                    Thread.sleep(1000); // কাজ সম্পন্ন করা
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        executor.shutdown();
    }
}

BlockingQueue এর বিভিন্ন ইমপ্লিমেন্টেশন

১. ArrayBlockingQueue

  • নির্দিষ্ট আকারের।
  • প্রথমে ইনসার্ট, প্রথমে রিমুভ (FIFO)।
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);

২. LinkedBlockingQueue

  • ডায়নামিক আকারের।
  • বড় ডেটা সেটের জন্য উপযুক্ত।
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

৩. PriorityBlockingQueue

  • উপাদানগুলিকে অর্ডার করে রাখে।
  • Comparator ব্যবহার করা যায়।
BlockingQueue<Integer> queue = new PriorityBlockingQueue<>();

৪. DelayQueue

  • উপাদানগুলোকে ডিলেই সময় পর্যন্ত হোল্ড করে।
  • Delayed ইন্টারফেস ব্যবহার করা হয়।
BlockingQueue<DelayedTask> queue = new DelayQueue<>();

BlockingQueue এর সুবিধা

  1. থ্রেড-সেফ: একাধিক থ্রেড একসাথে কাজ করতে পারে।
  2. ব্লকিং মেকানিজম: ব্যস্ত অপেক্ষা (busy-waiting) প্রতিরোধ করে।
  3. মাল্টি-থ্রেড প্রোগ্রামিংয়ে সহজ: প্রযোজক-গ্রাহক প্যাটার্ন সহজ করে।

BlockingQueue এর সীমাবদ্ধতা

  1. ফুল কিউ: যদি কিউ পূর্ণ হয়, তখন put() থ্রেডকে ব্লক করে।
  2. খালি কিউ: যদি কিউ খালি থাকে, তখন take() থ্রেডকে ব্লক করে।
  3. ডিলেইড প্রসেসিং: বড় ডেটা সেটের জন্য অপেক্ষা সময় বাড়তে পারে।

BlockingQueue হল জাভার কনকারেন্সি মডেলের একটি অত্যন্ত কার্যকর ডেটা স্ট্রাকচার, যা থ্রেড-সেফ এবং ব্লকিং মেকানিজম সমর্থন করে। এটি প্রযোজক-গ্রাহক প্যাটার্ন, থ্রেড পুল, এবং ডেলেইড প্রসেসিং এর মত কাজ সহজ করে। সঠিক ইমপ্লিমেন্টেশন এবং অপারেশন চয়নের মাধ্যমে এটি মাল্টি-থ্রেডেড প্রোগ্রামিংকে আরও কার্যকর ও উন্নত করে।

Content added By

Producer-Consumer Problem একটি ক্লাসিক কনকারেন্সি সমস্যা, যেখানে একটি প্রযোজক (Producer) ডেটা তৈরি করে এবং একটি গ্রাহক (Consumer) সেই ডেটা ব্যবহার করে। এই সমস্যা সমাধানের জন্য প্রযোজক এবং গ্রাহকের মধ্যে সিঙ্ক্রোনাইজেশন দরকার যাতে ডেটা লস বা ডেডলক না ঘটে।


সমাধান কৌশল

  1. BlockingQueue ব্যবহার করে: জাভার BlockingQueue ক্লাস এই সমস্যার জন্য সহজ ও কার্যকর সমাধান প্রদান করে।
  2. wait()-notify() মেথড ব্যবহার করে: ম্যানুয়ালি থ্রেড সিঙ্ক্রোনাইজেশনের মাধ্যমে সমস্যার সমাধান করা।
  3. Locks এবং Conditions ব্যবহার করে: লক এবং কন্ডিশন ভেরিয়েবল ব্যবহার করে আরও কাস্টমাইজড সমাধান।

BlockingQueue ব্যবহার করে সমাধান

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ProducerConsumerWithBlockingQueue {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5); // কিউ-এর আকার ৫

        // প্রযোজক থ্রেড
        Thread producer = new Thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    queue.put(i); // কিউতে ডেটা যোগ করা
                    System.out.println("Produced: " + i);
                    Thread.sleep(500); // স্লিপ করে ধীরে উৎপাদন
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // গ্রাহক থ্রেড
        Thread consumer = new Thread(() -> {
            try {
                while (true) {
                    int value = queue.take(); // কিউ থেকে ডেটা পড়া
                    System.out.println("Consumed: " + value);
                    Thread.sleep(1000); // স্লিপ করে ধীরে গ্রাহন
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();
    }
}

কোডের ব্যাখ্যা

  • put() এবং take(): BlockingQueue স্বয়ংক্রিয়ভাবে ব্লক করে যদি কিউ পূর্ণ বা খালি থাকে।
  • সিঙ্ক্রোনাইজেশন: থ্রেড সেফটি নিশ্চিত করতে কোনও অতিরিক্ত লক বা wait-notify প্রয়োজন নেই।

wait()-notify() ব্যবহার করে সমাধান

import java.util.LinkedList;

class SharedResource {
    private final LinkedList<Integer> list = new LinkedList<>();
    private final int CAPACITY = 5;

    public synchronized void produce(int value) throws InterruptedException {
        while (list.size() == CAPACITY) {
            wait(); // কিউ পূর্ণ হলে অপেক্ষা
        }
        list.add(value);
        System.out.println("Produced: " + value);
        notify(); // গ্রাহককে জাগ্রত করা
    }

    public synchronized int consume() throws InterruptedException {
        while (list.isEmpty()) {
            wait(); // কিউ খালি হলে অপেক্ষা
        }
        int value = list.removeFirst();
        System.out.println("Consumed: " + value);
        notify(); // প্রযোজককে জাগ্রত করা
        return value;
    }
}

public class ProducerConsumerWithWaitNotify {
    public static void main(String[] args) {
        SharedResource sharedResource = new SharedResource();

        // প্রযোজক থ্রেড
        Thread producer = new Thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    sharedResource.produce(i);
                    Thread.sleep(500);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // গ্রাহক থ্রেড
        Thread consumer = new Thread(() -> {
            try {
                while (true) {
                    sharedResource.consume();
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();
    }
}

কোডের ব্যাখ্যা

  • wait() এবং notify():
    • যখন কিউ পূর্ণ থাকে, প্রযোজক থ্রেড wait() করে।
    • যখন কিউ খালি থাকে, গ্রাহক থ্রেড wait() করে।
    • notify() ব্যবহার করে অপেক্ষমান থ্রেডকে জাগ্রত করা হয়।
  • সিঙ্ক্রোনাইজড ব্লক: মাল্টিথ্রেডেড এক্সেসকে সুরক্ষিত করে।

Locks এবং Conditions ব্যবহার করে সমাধান

import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class SharedResourceWithLock {
    private final LinkedList<Integer> list = new LinkedList<>();
    private final int CAPACITY = 5;
    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public void produce(int value) throws InterruptedException {
        lock.lock();
        try {
            while (list.size() == CAPACITY) {
                notFull.await(); // কিউ পূর্ণ হলে অপেক্ষা
            }
            list.add(value);
            System.out.println("Produced: " + value);
            notEmpty.signal(); // গ্রাহককে জাগ্রত করা
        } finally {
            lock.unlock();
        }
    }

    public int consume() throws InterruptedException {
        lock.lock();
        try {
            while (list.isEmpty()) {
                notEmpty.await(); // কিউ খালি হলে অপেক্ষা
            }
            int value = list.removeFirst();
            System.out.println("Consumed: " + value);
            notFull.signal(); // প্রযোজককে জাগ্রত করা
            return value;
        } finally {
            lock.unlock();
        }
    }
}

public class ProducerConsumerWithLocks {
    public static void main(String[] args) {
        SharedResourceWithLock sharedResource = new SharedResourceWithLock();

        // প্রযোজক থ্রেড
        Thread producer = new Thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    sharedResource.produce(i);
                    Thread.sleep(500);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // গ্রাহক থ্রেড
        Thread consumer = new Thread(() -> {
            try {
                while (true) {
                    sharedResource.consume();
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();
    }
}

কোডের ব্যাখ্যা

  • Lock এবং Condition:
    • notFull.await() এবং notEmpty.await() নির্দিষ্ট কন্ডিশনে থ্রেডকে অপেক্ষা করায়।
    • notFull.signal() এবং notEmpty.signal() ব্যবহার করে অপেক্ষমান থ্রেডকে জাগ্রত করা হয়।
  • উন্নত নিয়ন্ত্রণ: Condition ব্যবহারে নির্দিষ্ট কন্ডিশনের উপর ভিত্তি করে সিঙ্ক্রোনাইজেশন সহজ হয়।

  1. BlockingQueue: সরল এবং কার্যকর। BlockingQueue ব্যবহার করা সর্বাধিক সুপারিশকৃত সমাধান।
  2. wait()-notify(): বেশি কাস্টমাইজড এবং ম্যানুয়াল কনট্রোলের জন্য উপযোগী।
  3. Locks এবং Conditions: জটিল সমস্যার জন্য যেখানে স্পষ্ট এবং নির্দিষ্ট নিয়ন্ত্রণ প্রয়োজন।

Producer-Consumer সমস্যা সমাধানে এই পদ্ধতিগুলি জাভার কনকারেন্সি ফ্রেমওয়ার্কের শক্তি এবং নমনীয়তা প্রমাণ করে।

Content added By

ArrayBlockingQueue এবং LinkedBlockingQueue হল জাভার কনকারেন্সি ফ্রেমওয়ার্কে BlockingQueue ইন্টারফেসের দুটি গুরুত্বপূর্ণ ইমপ্লিমেন্টেশন। এগুলো মূলত প্রযোজক-গ্রাহক (Producer-Consumer) প্যাটার্নে ব্যবহৃত হয় এবং থ্রেড সিঙ্ক্রোনাইজেশন নিশ্চিত করে।


১. ArrayBlockingQueue

ArrayBlockingQueue হলো একটি ফিক্সড সাইজের ব্লকিং কিউ, যেখানে একটি নির্দিষ্ট আকারের অ্যারে ব্যবহার করা হয়। এটি FIFO (First-In-First-Out) অর্ডারে কাজ করে।

বৈশিষ্ট্য:

  • পূর্বনির্ধারিত আকার (capacity)।
  • একসাথে একাধিক থ্রেড দ্বারা ব্যবহৃত হতে পারে।
  • থ্রেড-সেফ, যেখানে প্রযোজক এবং গ্রাহক লকিং মেকানিজম ব্যবহার করে।

উদাহরণ:

import java.util.concurrent.ArrayBlockingQueue;

public class ArrayBlockingQueueExample {
    public static void main(String[] args) {
        ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5); // আকার 5

        // প্রযোজক থ্রেড
        Thread producer = new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    queue.put(i); // কিউতে মান যোগ করা
                    System.out.println("Produced: " + i);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // গ্রাহক থ্রেড
        Thread consumer = new Thread(() -> {
            try {
                while (true) {
                    Integer value = queue.take(); // কিউ থেকে মান নেওয়া
                    System.out.println("Consumed: " + value);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        producer.start();
        consumer.start();
    }
}

২. LinkedBlockingQueue

LinkedBlockingQueue হলো একটি ডাইনামিক সাইজের ব্লকিং কিউ, যা লিঙ্কড লিস্ট ব্যবহার করে তৈরি করা হয়। এটি FIFO অর্ডারে কাজ করে এবং বড় আকারের ডেটা পরিচালনার জন্য উপযুক্ত।

বৈশিষ্ট্য:

  • ফিক্সড বা আনলিমিটেড সাইজ হতে পারে।
  • প্রযোজক এবং গ্রাহকের জন্য পৃথক লক ব্যবহার করে, ফলে লক প্রতিযোগিতা (lock contention) কম হয়।
  • বড় ডেটাসেট পরিচালনার জন্য কার্যকর।

উদাহরণ:

import java.util.concurrent.LinkedBlockingQueue;

public class LinkedBlockingQueueExample {
    public static void main(String[] args) {
        LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(3); // ফিক্সড আকার 3

        // প্রযোজক থ্রেড
        Thread producer = new Thread(() -> {
            try {
                queue.put("One");
                System.out.println("Produced: One");
                queue.put("Two");
                System.out.println("Produced: Two");
                queue.put("Three");
                System.out.println("Produced: Three");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // গ্রাহক থ্রেড
        Thread consumer = new Thread(() -> {
            try {
                while (true) {
                    String value = queue.take(); // কিউ থেকে মান নেওয়া
                    System.out.println("Consumed: " + value);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        producer.start();
        consumer.start();
    }
}

৩. ArrayBlockingQueue এবং LinkedBlockingQueue এর তুলনা

বৈশিষ্ট্যArrayBlockingQueueLinkedBlockingQueue
মেমোরি স্ট্রাকচারফিক্সড সাইজ অ্যারেলিঙ্কড লিস্ট
ডেটা সাইজপূর্বনির্ধারিত (capacity বাধ্যতামূলক)ফিক্সড বা আনলিমিটেড
লক ব্যবস্থাপনাএকক লকপৃথক লক (প্রযোজক ও গ্রাহকের জন্য আলাদা)
পারফরম্যান্সছোট ডেটাসেটের জন্য কার্যকরবড় ডেটাসেটের জন্য কার্যকর
ব্যবহার ক্ষেত্রসীমিত ডেটা পরিচালনাবড় বা ডাইনামিক ডেটাসেট

৪. দুইটি একত্রে ব্যবহার: উদাহরণ

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class CombinedBlockingQueueExample {
    public static void main(String[] args) {
        ArrayBlockingQueue<Integer> arrayQueue = new ArrayBlockingQueue<>(3);
        LinkedBlockingQueue<String> linkedQueue = new LinkedBlockingQueue<>();

        // প্রযোজক থ্রেড (ArrayBlockingQueue)
        Thread arrayProducer = new Thread(() -> {
            try {
                for (int i = 1; i <= 3; i++) {
                    arrayQueue.put(i);
                    System.out.println("ArrayQueue Produced: " + i);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // গ্রাহক থ্রেড (ArrayBlockingQueue)
        Thread arrayConsumer = new Thread(() -> {
            try {
                while (!arrayQueue.isEmpty()) {
                    Integer value = arrayQueue.take();
                    System.out.println("ArrayQueue Consumed: " + value);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // প্রযোজক থ্রেড (LinkedBlockingQueue)
        Thread linkedProducer = new Thread(() -> {
            try {
                linkedQueue.put("One");
                linkedQueue.put("Two");
                linkedQueue.put("Three");
                System.out.println("LinkedQueue Produced: Three items");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // গ্রাহক থ্রেড (LinkedBlockingQueue)
        Thread linkedConsumer = new Thread(() -> {
            try {
                while (!linkedQueue.isEmpty()) {
                    String value = linkedQueue.take();
                    System.out.println("LinkedQueue Consumed: " + value);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // থ্রেড চালু করা
        arrayProducer.start();
        arrayConsumer.start();
        linkedProducer.start();
        linkedConsumer.start();
    }
}

  1. ArrayBlockingQueue: ছোট এবং ফিক্সড সাইজের ডেটার জন্য কার্যকর।
  2. LinkedBlockingQueue: বড় এবং ডাইনামিক সাইজের ডেটার জন্য উপযুক্ত।
  3. উভয়ই প্রযোজক-গ্রাহক প্যাটার্নে থ্রেড সিঙ্ক্রোনাইজেশনের জন্য কার্যকর এবং থ্রেড-সেফ।

উপযুক্ত প্রয়োজন অনুযায়ী এই দুটি কিউ ব্যবহার করে মাল্টিথ্রেডেড প্রোগ্রামিং সহজ ও দক্ষ করা যায়।

Content added By

জাভা কনকারেন্সি হলো মাল্টিথ্রেডিং এবং সিঙ্ক্রোনাইজড প্রোগ্রামিংয়ের জন্য একটি শক্তিশালী প্যাকেজ (java.util.concurrent)। এটি ডেটা শেয়ারিং এবং থ্রেডের মধ্যে সমন্বয় পরিচালনার জন্য কার্যকরী টুলস প্রদান করে।


DelayQueue

DelayQueue কী?

  • DelayQueue হলো একটি ব্লকিং কিউ যা Delayed ইন্টারফেস ইমপ্লিমেন্ট করা অবজেক্ট রাখে।
  • অবজেক্টগুলো নির্দিষ্ট সময় পর কিউ থেকে তোলা যায়।
  • এটি টাইম-সেন্সিটিভ টাস্ক প্রসেসিংয়ের জন্য আদর্শ।

ব্যবহার:

  • Task Scheduling: নির্দিষ্ট সময় পরে কোনো টাস্ক চালানোর জন্য।
  • Caching: অবজেক্টের টাইম টু লাইভ (TTL) ম্যানেজ করতে।

উদাহরণ:

import java.util.concurrent.*;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

class DelayedTask implements Delayed {
    private final String name;
    private final long startTime;

    public DelayedTask(String name, long delayInMillis) {
        this.name = name;
        this.startTime = System.currentTimeMillis() + delayInMillis;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(startTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public String toString() {
        return "Task{name='" + name + "'}";
    }
}

public class DelayQueueExample {
    public static void main(String[] args) throws InterruptedException {
        DelayQueue<DelayedTask> queue = new DelayQueue<>();

        // Adding tasks
        queue.put(new DelayedTask("Task1", 2000));
        queue.put(new DelayedTask("Task2", 4000));
        queue.put(new DelayedTask("Task3", 1000));

        // Processing tasks
        while (!queue.isEmpty()) {
            DelayedTask task = queue.take(); // Waits until the delay expires
            System.out.println("Processed: " + task);
        }
    }
}

PriorityBlockingQueue

PriorityBlockingQueue কী?

  • PriorityBlockingQueue হলো একটি থ্রেড-সেফ ব্লকিং কিউ যা এলিমেন্টগুলোকে তাদের প্রায়োরিটির ভিত্তিতে সাজায়।
  • Comparable ইন্টারফেস বা Comparator এর মাধ্যমে প্রায়োরিটি নির্ধারণ করা হয়।

ব্যবহার:

  • Task Priority Management: উচ্চ-প্রায়োরিটির টাস্ক আগে প্রসেস করতে।
  • Job Scheduling: মাল্টি-জব ম্যানেজমেন্ট সিস্টেমে।

উদাহরণ:

import java.util.concurrent.*;

public class PriorityBlockingQueueExample {
    public static void main(String[] args) throws InterruptedException {
        PriorityBlockingQueue<String> queue = new PriorityBlockingQueue<>();

        // Adding elements
        queue.put("Low Priority");
        queue.put("High Priority");
        queue.put("Medium Priority");

        // Retrieving elements (Natural Order or Comparable)
        while (!queue.isEmpty()) {
            System.out.println("Processed: " + queue.take());
        }
    }
}

SynchronousQueue

SynchronousQueue কী?

  • SynchronousQueue একটি বিশেষ ধরনের ব্লকিং কিউ, যেখানে কোনো স্টোরেজ ক্ষমতা নেই। প্রতিটি put() মেথডের জন্য একটি take() কল থাকা আবশ্যক।
  • এটি direct handoff করার জন্য ব্যবহৃত হয়।

ব্যবহার:

  • Thread Communication: প্রোডিউসার এবং কনজিউমারের মধ্যে সরাসরি ডেটা শেয়ার।
  • Load Balancing: একাধিক থ্রেডের মধ্যে কাজ ভাগাভাগি করতে।

উদাহরণ:

import java.util.concurrent.*;

public class SynchronousQueueExample {
    public static void main(String[] args) {
        SynchronousQueue<String> queue = new SynchronousQueue<>();

        // Producer Thread
        new Thread(() -> {
            try {
                System.out.println("Putting: Task1");
                queue.put("Task1");
                System.out.println("Putting: Task2");
                queue.put("Task2");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        // Consumer Thread
        new Thread(() -> {
            try {
                System.out.println("Taken: " + queue.take());
                System.out.println("Taken: " + queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

কনকারেন্সি টুলসের তুলনা

Queue TypeUse CaseBlocking Behavior
DelayQueueটাইম-বেজড টাস্ক শিডিউলিং।ডেটা তখনই অ্যাক্সেসযোগ্য যখন টাইম এক্সপায়ার।
PriorityBlockingQueueপ্রায়োরিটি বেসড ডেটা প্রসেসিং।ব্লক হয় না; সর্বোচ্চ প্রায়োরিটি এলিমেন্ট আগে প্রসেস হয়।
SynchronousQueueপ্রোডিউসার-কনজিউমার সরাসরি ডেটা শেয়ারিং।প্রতিটি put() এর জন্য একটি take() প্রয়োজন।

  • DelayQueue নির্দিষ্ট সময় পর টাস্ক প্রসেস করার জন্য।
  • PriorityBlockingQueue প্রায়োরিটি নির্ভর ডেটা প্রসেসিংয়ে।
  • SynchronousQueue থ্রেডের মধ্যে ডেটা আদান-প্রদানে। জাভার কনকারেন্সি টুলস সঠিকভাবে ব্যবহার করলে মাল্টিথ্রেডেড অ্যাপ্লিকেশন আরও কার্যকর এবং নির্ভরযোগ্য হয়।
Content added By
Promotion

Are you sure to start over?

Loading...