Custom Partitioner তৈরি করা

Kafka Partitions এবং Data Distribution - অ্যাপাচি কাফকা (Apache Kafka) - Big Data and Analytics

317

কাফকা Partitioner হচ্ছে একটি মেকানিজম, যা মেসেজগুলি এক বা একাধিক পার্টিশনে ভাগ করতে ব্যবহৃত হয়। কাফকার ডিফল্ট পার্টিশনার সাধারণত কী (key) এর মাধ্যমে পার্টিশন নির্বাচন করে, কিন্তু যখন আপনি একটি কাস্টম পার্টিশনিং লজিক প্রয়োগ করতে চান, তখন কাস্টম পার্টিশনার তৈরি করা প্রয়োজন।

কাস্টম পার্টিশনার তৈরি করার মাধ্যমে আপনি ডেটা কীভাবে পার্টিশন করা হবে তা নিয়ন্ত্রণ করতে পারেন। এটি বিশেষত দরকারি হয় যখন আপনি ডেটা সুষমভাবে বিভিন্ন পার্টিশনে বিতরণ করতে চান বা বিশেষ ধরনের পার্টিশনিং কৌশল প্রয়োগ করতে চান।


Kafka Partitioner কী?

Kafka Partitioner হল একটি কম্পোনেন্ট যা প্রডিউসারকে নির্ধারণ করতে সাহায্য করে কোন পার্টিশনে মেসেজটি পাঠানো হবে। ডিফল্টভাবে, Kafka পার্টিশন নির্বাচন করতে মেসেজের কী (key) ব্যবহার করে, এবং কী এক্সিস্ট না করলে, একটি র‌্যান্ডম পার্টিশন নির্বাচন করে।

কিন্তু কখনো কখনো আপনার প্রজেক্টে এমন কিছু প্রয়োজনীয়তা থাকে, যেখানে পার্টিশনিং-এর জন্য কাস্টম লজিক দরকার হয়। এই ক্ষেত্রে, কাস্টম পার্টিশনার ব্যবহার করা হয়।


Custom Partitioner তৈরি করার ধাপ

আপনি যদি কাফকা প্রডিউসারকে কাস্টম পার্টিশনার ব্যবহার করতে চান, তবে আপনাকে একটি নতুন ক্লাস তৈরি করতে হবে যা org.apache.kafka.clients.producer.Partitioner ইন্টারফেসটি ইমপ্লিমেন্ট করবে।

১. Kafka Partitioner ইন্টারফেস ইমপ্লিমেন্ট করা

কাস্টম পার্টিশনার তৈরি করার জন্য প্রথমে Partitioner ইন্টারফেস ইমপ্লিমেন্ট করতে হবে। এই ইন্টারফেসটি তিনটি মেথড ধারণ করে:

  1. configure(Map<String, ?> configs): কনফিগারেশন সেটআপ করার জন্য ব্যবহৃত হয়।
  2. partition(Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster): এখানে কাস্টম পার্টিশনিং লজিক থাকে, যা মূল কাজটি করে।
  3. close(): পার্টিশনারটি বন্ধ করার জন্য ব্যবহৃত হয়।

২. Custom Partitioner উদাহরণ

এখানে একটি কাস্টম পার্টিশনার উদাহরণ দেওয়া হলো, যা কী (key)-এর প্রথম অক্ষরের উপর ভিত্তি করে পার্টিশন নির্বাচন করবে।

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;

public class CustomPartitioner implements Partitioner {

    @Override
    public void configure(Map<String, ?> configs) {
        // কোনো কনফিগারেশন প্রয়োজনে এখানে লোড করতে পারেন
    }

    @Override
    public int partition(Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (key == null) {
            return 0;  // যদি কী না থাকে, তবে পার্টিশন 0 এ পাঠানো হবে
        }

        // কী এর প্রথম অক্ষরের ASCII কোডের ভিত্তিতে পার্টিশন নির্বাচন করা হচ্ছে
        String keyStr = (String) key;
        int partition = keyStr.charAt(0) % cluster.partitionCountForTopic("test-topic");

        return partition; // পছন্দসই পার্টিশন রিটার্ন করা
    }

    @Override
    public void close() {
        // পার্টিশনার ক্লোজ করার জন্য ব্যবহৃত
    }
}

এই উদাহরণে, আমরা কী এর প্রথম অক্ষরের ASCII কোড ব্যবহার করে পার্টিশন নির্বাচন করেছি। এটি কেবল একটি উদাহরণ, আপনি আপনার প্রয়োজন অনুযায়ী আরও জটিল লজিক ব্যবহার করতে পারেন।

৩. Kafka Producer কনফিগারেশন

কাস্টম পার্টিশনার ব্যবহারের জন্য প্রডিউসার কনফিগারেশন ফাইলে partitioner.class প্রপার্টি ব্যবহার করতে হবে। এই প্রপার্টিতে আপনার কাস্টম পার্টিশনারের ক্লাসের নাম দিতে হবে।

bootstrap.servers=localhost:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
acks=all
partitioner.class=com.example.CustomPartitioner

এখানে, com.example.CustomPartitioner হলো আমাদের কাস্টম পার্টিশনারের পুরো প্যাকেজ নাম। আপনি আপনার প্রোজেক্ট অনুযায়ী এটি কনফিগার করবেন।

৪. Producer Example

এখন একটি প্রডিউসার কোড উদাহরণ দেওয়া হল যা কাস্টম পার্টিশনার ব্যবহার করে:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {
        // Kafka Producer Configuration
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("acks", "all");
        props.put("partitioner.class", "com.example.CustomPartitioner");

        // Creating Producer object
        Producer<String, String> producer = new KafkaProducer<>(props);

        // Sending message to Kafka Topic
        for (int i = 0; i < 10; i++) {
            String key = "key-" + i;
            String value = "Message " + i;
            ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", key, value);
            producer.send(record);
        }

        // Close the producer
        producer.close();
    }
}

এখানে, প্রডিউসার কনফিগারেশনটি কাস্টম পার্টিশনারের ক্লাস উল্লেখ করছে, এবং সেই অনুযায়ী ডেটা প্রেরণ করছে।


সারাংশ

Kafka-তে Custom Partitioner তৈরি করে আপনি আপনার ডেটার পার্টিশনিং কাস্টমাইজ করতে পারেন। এতে, আপনি আপনার লজিক অনুসারে মেসেজকে পার্টিশনে ভাগ করতে পারেন, যেমন কী এর উপর ভিত্তি করে, মেসেজের ভ্যালুর উপর ভিত্তি করে বা অন্য কোনো নির্দিষ্ট প্রক্রিয়া অনুসরণ করে। কাস্টম পার্টিশনার তৈরির জন্য, Partitioner ইন্টারফেস ইমপ্লিমেন্ট করে, আপনার কাস্টম লজিক লিখতে হবে এবং সেটি প্রডিউসার কনফিগারেশনে উল্লেখ করতে হবে। এটি ডেটা সুষমভাবে বিতরণ করতে এবং পারফরম্যান্স অপটিমাইজ করতে সহায়তা করে।

Content added By
Promotion

Are you sure to start over?

Loading...