অ্যাপাচি কাফকা Producer API ব্যবহার করে ডেটা কাফকা টপিকে প্রেরণ করা হয়। এটি একটি গুরুত্বপূর্ণ কম্পোনেন্ট কারণ এটি ডেটা সিস্টেম থেকে কাফকা সার্ভারে পাঠানোর দায়িত্ব পালন করে। কাফকা প্রডিউসার সাধারণত ডিস্ট্রিবিউটেড সিস্টেমে ডেটা ট্রান্সফারের জন্য ব্যবহৃত হয়, এবং এর কার্যকারিতা এবং কনফিগারেশন খুবই গুরুত্বপূর্ণ।
Kafka Producer API এর কার্যক্রম
Kafka Producer API এমন একটি ইন্টারফেস যা আপনাকে টপিকগুলোতে ডেটা পাঠানোর জন্য ব্যবহৃত হয়। Producer API ডেটাকে পার্টিশন অনুযায়ী প্রেরণ করে এবং তা কাফকা ব্রোকারে সংরক্ষণ করে।
Producer API-এর মূল কার্যাবলি:
- Producer Configuration: প্রডিউসার কনফিগারেশন ফাইল ব্যবহার করে কাফকা প্রডিউসার কনফিগার করা হয়। এটি ব্রোকার অ্যাড্রেস, সেরিয়ালাইজার, এবং অন্যান্য বৈশিষ্ট্য নির্ধারণ করতে সহায়তা করে।
- Serialization: কাফকা প্রডিউসার পাঠানো ডেটাকে সেরিয়ালাইজ করে। আপনি যদি স্ট্রিং অথবা JSON টাইপের ডেটা পাঠাতে চান, তবে সেগুলোর সেরিয়ালাইজারের জন্য সঠিক ক্লাস ব্যবহার করতে হবে।
- Sending Data: প্রডিউসার একাধিক বার ডেটা পাঠাতে পারে এবং এটি একটি অ্যাসিঙ্ক্রোনাস পদ্ধতিতে কাজ করে, অর্থাৎ ডেটা প্রেরণ হয়ে যাওয়ার পর প্রডিউসার কমিউনিকেশন প্রক্রিয়াটি অব্যাহত রাখে।
Kafka Producer API-এর সাধারণ স্টেপস:
- প্রডিউসার কনফিগারেশন তৈরি করা
- কনফিগারেশন অনুযায়ী প্রডিউসার ইনিশিয়ালাইজ করা
- ডেটা পাঠানো
- প্রডিউসার বন্ধ করা
Kafka Producer Configuration
Kafka প্রডিউসার কনফিগারেশন বিভিন্ন অপশন দিয়ে কনফিগার করা হয়। প্রডিউসার কনফিগারেশন ক্লাসটি org.apache.kafka.clients.producer.ProducerConfig হিসেবে পরিচিত।
নীচে কিছু প্রধান প্রডিউসার কনফিগারেশন অপশন আলোচনা করা হল:
1. bootstrap.servers
এই অপশনটি আপনার কাফকা ব্রোকারের লিস্ট নির্ধারণ করে, যেখানে প্রডিউসার ডেটা পাঠাবে।
bootstrap.servers=localhost:9092
2. key.serializer
এটি কী (Key) সেরিয়ালাইজারের জন্য ব্যবহৃত হয়। সাধারণত স্ট্রিং, ইন্টিজার বা বাইনারি ডেটা পাঠাতে আপনি সঠিক সেরিয়ালাইজার ব্যবহার করবেন।
key.serializer=org.apache.kafka.common.serialization.StringSerializer
3. value.serializer
এটি ভ্যালু (Value) সেরিয়ালাইজারের জন্য ব্যবহৃত হয়। এটি নির্ধারণ করে ডেটা কীভাবে সেরিয়ালাইজ হবে। সাধারণভাবে আপনি স্ট্রিং বা JSON সেরিয়ালাইজার ব্যবহার করতে পারেন।
value.serializer=org.apache.kafka.common.serialization.StringSerializer
4. acks
এই কনফিগারেশন দ্বারা, আপনি নির্ধারণ করেন কতজন ব্রোকার নিশ্চিত করার পরে প্রডিউসার একটি মেসেজ সফলভাবে পাঠানো হয়েছে বলে মনে করবে। এর তিনটি অপশন থাকে:
- 0: মেসেজের জন্য কোনো অ্যাকনলেজমেন্টের প্রয়োজন নেই।
- 1: লিড ব্রোকার নিশ্চিত করার পর মেসেজ সফল হবে।
- all: সমস্ত ব্রোকার অ্যাকনলেজ করার পর মেসেজ সফল হবে।
acks=all
5. retries
যদি প্রডিউসার কোনো কারণে মেসেজ প্রেরণ করতে না পারে, তাহলে এই কনফিগারেশন দ্বারা আপনি পুনরায় প্রেরণের চেষ্টা নির্ধারণ করতে পারেন। এটি ডিফল্টভাবে ০ থাকে।
retries=3
6. linger.ms
এই অপশনটি প্রডিউসারের মধ্যে ডেটা জমা হওয়ার সময় নির্ধারণ করে। এটি একটি বিলম্ব সময় হিসেবে কাজ করে, যেখানে প্রডিউসার একাধিক মেসেজ জড়ো করে একটি গ্রুপ হিসেবে প্রেরণ করতে পারে।
linger.ms=5
7. batch.size
এই কনফিগারেশন প্রডিউসারকে নির্ধারণ করতে সহায়তা করে যে কতটুকু ডেটা একত্রিত হওয়ার পর সেটি কাফকা ব্রোকারে পাঠানো হবে। এটি কিলোবাইট (KB) বা মেগাবাইট (MB) হিসেবে নির্ধারণ করা যায়।
batch.size=16384
8. compression.type
এই কনফিগারেশনটি প্রডিউসারকে ডেটা সংকুচিত করার জন্য ব্যবহৃত হয়। এটি একটি গুরুত্বপূর্ণ অপশন, বিশেষ করে বড় পরিমাণ ডেটার ক্ষেত্রে।
compression.type=gzip
9. client.id
এই কনফিগারেশন দ্বারা, আপনি প্রডিউসারের ক্লায়েন্ট আইডি সেট করতে পারেন। এটি কাফকা ক্লাস্টারে লগিং এবং মনিটরিংয়ের জন্য ব্যবহার করা হয়।
client.id=producer-client-1
Kafka Producer API উদাহরণ
এখন আমরা একটি সাধারণ Java প্রোগ্রাম দিয়ে Kafka প্রডিউসার কিভাবে কাজ করে তা দেখব।
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
// Kafka Producer Configuration
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
props.put("retries", 3);
props.put("linger.ms", 5);
// Creating Producer object
Producer<String, String> producer = new KafkaProducer<>(props);
// Sending message to Kafka Topic
for (int i = 0; i < 10; i++) {
String key = "key-" + i;
String value = "Hello Kafka " + i;
ProducerRecord<String, String> record = new ProducerRecord<>("test", key, value);
producer.send(record);
}
// Close the producer
producer.close();
}
}
এই প্রোগ্রামে:
- bootstrap.servers: এটি কাফকা ব্রোকারের অ্যাড্রেস।
- key.serializer এবং value.serializer: কী এবং ভ্যালু সেরিয়ালাইজার।
- acks: সমস্ত ব্রোকারের অ্যাকনলেজমেন্টের পরে প্রডিউসার মেসেজ সফল বলে মনে করবে।
সারাংশ
Kafka Producer API হল কাফকা টপিকে ডেটা পাঠানোর প্রধান উপায়। প্রডিউসার কনফিগারেশন সেটিংসের মাধ্যমে আপনি বিভিন্ন অপশন কাস্টমাইজ করতে পারেন, যেমন ব্রোকার অ্যাড্রেস, সেরিয়ালাইজেশন, রিটার্ন অ্যাকনলেজমেন্ট (acks), এবং ডেটা কমপ্রেশন। সঠিক কনফিগারেশন এবং সেরিয়ালাইজেশন ব্যবহার করে আপনি কাফকায় কার্যকরভাবে ডেটা পাঠাতে সক্ষম হবেন।
Read more