কাফকা Partitioner হচ্ছে একটি মেকানিজম, যা মেসেজগুলি এক বা একাধিক পার্টিশনে ভাগ করতে ব্যবহৃত হয়। কাফকার ডিফল্ট পার্টিশনার সাধারণত কী (key) এর মাধ্যমে পার্টিশন নির্বাচন করে, কিন্তু যখন আপনি একটি কাস্টম পার্টিশনিং লজিক প্রয়োগ করতে চান, তখন কাস্টম পার্টিশনার তৈরি করা প্রয়োজন।
কাস্টম পার্টিশনার তৈরি করার মাধ্যমে আপনি ডেটা কীভাবে পার্টিশন করা হবে তা নিয়ন্ত্রণ করতে পারেন। এটি বিশেষত দরকারি হয় যখন আপনি ডেটা সুষমভাবে বিভিন্ন পার্টিশনে বিতরণ করতে চান বা বিশেষ ধরনের পার্টিশনিং কৌশল প্রয়োগ করতে চান।
Kafka Partitioner কী?
Kafka Partitioner হল একটি কম্পোনেন্ট যা প্রডিউসারকে নির্ধারণ করতে সাহায্য করে কোন পার্টিশনে মেসেজটি পাঠানো হবে। ডিফল্টভাবে, Kafka পার্টিশন নির্বাচন করতে মেসেজের কী (key) ব্যবহার করে, এবং কী এক্সিস্ট না করলে, একটি র্যান্ডম পার্টিশন নির্বাচন করে।
কিন্তু কখনো কখনো আপনার প্রজেক্টে এমন কিছু প্রয়োজনীয়তা থাকে, যেখানে পার্টিশনিং-এর জন্য কাস্টম লজিক দরকার হয়। এই ক্ষেত্রে, কাস্টম পার্টিশনার ব্যবহার করা হয়।
Custom Partitioner তৈরি করার ধাপ
আপনি যদি কাফকা প্রডিউসারকে কাস্টম পার্টিশনার ব্যবহার করতে চান, তবে আপনাকে একটি নতুন ক্লাস তৈরি করতে হবে যা org.apache.kafka.clients.producer.Partitioner ইন্টারফেসটি ইমপ্লিমেন্ট করবে।
১. Kafka Partitioner ইন্টারফেস ইমপ্লিমেন্ট করা
কাস্টম পার্টিশনার তৈরি করার জন্য প্রথমে Partitioner ইন্টারফেস ইমপ্লিমেন্ট করতে হবে। এই ইন্টারফেসটি তিনটি মেথড ধারণ করে:
configure(Map<String, ?> configs): কনফিগারেশন সেটআপ করার জন্য ব্যবহৃত হয়।partition(Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster): এখানে কাস্টম পার্টিশনিং লজিক থাকে, যা মূল কাজটি করে।close(): পার্টিশনারটি বন্ধ করার জন্য ব্যবহৃত হয়।
২. Custom Partitioner উদাহরণ
এখানে একটি কাস্টম পার্টিশনার উদাহরণ দেওয়া হলো, যা কী (key)-এর প্রথম অক্ষরের উপর ভিত্তি করে পার্টিশন নির্বাচন করবে।
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {
// কোনো কনফিগারেশন প্রয়োজনে এখানে লোড করতে পারেন
}
@Override
public int partition(Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (key == null) {
return 0; // যদি কী না থাকে, তবে পার্টিশন 0 এ পাঠানো হবে
}
// কী এর প্রথম অক্ষরের ASCII কোডের ভিত্তিতে পার্টিশন নির্বাচন করা হচ্ছে
String keyStr = (String) key;
int partition = keyStr.charAt(0) % cluster.partitionCountForTopic("test-topic");
return partition; // পছন্দসই পার্টিশন রিটার্ন করা
}
@Override
public void close() {
// পার্টিশনার ক্লোজ করার জন্য ব্যবহৃত
}
}
এই উদাহরণে, আমরা কী এর প্রথম অক্ষরের ASCII কোড ব্যবহার করে পার্টিশন নির্বাচন করেছি। এটি কেবল একটি উদাহরণ, আপনি আপনার প্রয়োজন অনুযায়ী আরও জটিল লজিক ব্যবহার করতে পারেন।
৩. Kafka Producer কনফিগারেশন
কাস্টম পার্টিশনার ব্যবহারের জন্য প্রডিউসার কনফিগারেশন ফাইলে partitioner.class প্রপার্টি ব্যবহার করতে হবে। এই প্রপার্টিতে আপনার কাস্টম পার্টিশনারের ক্লাসের নাম দিতে হবে।
bootstrap.servers=localhost:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
acks=all
partitioner.class=com.example.CustomPartitioner
এখানে, com.example.CustomPartitioner হলো আমাদের কাস্টম পার্টিশনারের পুরো প্যাকেজ নাম। আপনি আপনার প্রোজেক্ট অনুযায়ী এটি কনফিগার করবেন।
৪. Producer Example
এখন একটি প্রডিউসার কোড উদাহরণ দেওয়া হল যা কাস্টম পার্টিশনার ব্যবহার করে:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
// Kafka Producer Configuration
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
props.put("partitioner.class", "com.example.CustomPartitioner");
// Creating Producer object
Producer<String, String> producer = new KafkaProducer<>(props);
// Sending message to Kafka Topic
for (int i = 0; i < 10; i++) {
String key = "key-" + i;
String value = "Message " + i;
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", key, value);
producer.send(record);
}
// Close the producer
producer.close();
}
}
এখানে, প্রডিউসার কনফিগারেশনটি কাস্টম পার্টিশনারের ক্লাস উল্লেখ করছে, এবং সেই অনুযায়ী ডেটা প্রেরণ করছে।
সারাংশ
Kafka-তে Custom Partitioner তৈরি করে আপনি আপনার ডেটার পার্টিশনিং কাস্টমাইজ করতে পারেন। এতে, আপনি আপনার লজিক অনুসারে মেসেজকে পার্টিশনে ভাগ করতে পারেন, যেমন কী এর উপর ভিত্তি করে, মেসেজের ভ্যালুর উপর ভিত্তি করে বা অন্য কোনো নির্দিষ্ট প্রক্রিয়া অনুসরণ করে। কাস্টম পার্টিশনার তৈরির জন্য, Partitioner ইন্টারফেস ইমপ্লিমেন্ট করে, আপনার কাস্টম লজিক লিখতে হবে এবং সেটি প্রডিউসার কনফিগারেশনে উল্লেখ করতে হবে। এটি ডেটা সুষমভাবে বিতরণ করতে এবং পারফরম্যান্স অপটিমাইজ করতে সহায়তা করে।
Read more