Kafka একটি উচ্চ পারফরম্যান্স, স্কেলেবল, এবং ডিস্ট্রিবিউটেড স্ট্রিমিং প্ল্যাটফর্ম যা মূলত লগ ডাটা বা স্ট্রিমিং ডাটা প্রক্রিয়াকরণের জন্য ব্যবহৃত হয়। স্কালার মধ্যে Kafka ব্যবহার করতে, আপনাকে কিছু নির্দিষ্ট লায়ব্রেরি এবং ডিপেন্ডেন্সি যুক্ত করতে হবে, যেমন Apache Kafka এবং Akka Streams।
এই গাইডে, আমরা স্কালায় Kafka ব্যবহার করার জন্য প্রয়োজনীয় স্টেপগুলি ব্যাখ্যা করব, যেমন Kafka প্রডিউসার (Producer) এবং কনজিউমার (Consumer) তৈরি করা এবং Kafka এর মাধ্যমে ডাটা প্রেরণ করা।
১. Kafka এবং sbt ডিপেন্ডেন্সি যুক্ত করা
স্কালায় Kafka ব্যবহার করতে, প্রথমে আপনাকে Kafka Client Library এর ডিপেন্ডেন্সি sbt (Scala Build Tool) ফাইলে যোগ করতে হবে।
sbt ফাইলে Kafka ডিপেন্ডেন্সি যুক্ত করা:
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.8.0"
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.8.0"kafka-clients: Kafka এর ক্লায়েন্ট লাইব্রেরি যা প্রডিউসার এবং কনজিউমারের জন্য ব্যবহৃত হয়।kafka: Scala Kafka লাইব্রেরি।
২. Kafka প্রডিউসার তৈরি করা (Producer)
Kafka প্রডিউসার একটি অ্যাপ্লিকেশন যা ডাটা Kafka টপিকে পাঠায়। এখানে আমরা একটি প্রডিউসার তৈরি করব যা একটি টপিকে বার্তা প্রেরণ করবে।
Kafka প্রডিউসার কোড:
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import java.util.Properties
object KafkaProducerExample {
def main(args: Array[String]): Unit = {
// Kafka প্রডিউসার কনফিগারেশন
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") // Kafka সার্ভার
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// Kafka প্রডিউসার তৈরি করা
val producer = new KafkaProducer[String, String](props)
// Kafka টপিকের জন্য বার্তা পাঠানো
val record = new ProducerRecord[String, String]("test-topic", "key", "Hello, Kafka!")
producer.send(record) // বার্তা পাঠানো
println("Message sent successfully!")
// প্রডিউসার বন্ধ করা
producer.close()
}
}এখানে:
props.put: Kafka সার্ভারের ঠিকানা এবং সিরিয়ালাইজার সেট করা হয়েছে।ProducerRecord: Kafka টপিক এবং বার্তা তৈরি করা হয়েছে।producer.send(record): বার্তা Kafka টপিকে পাঠানো হয়েছে।
টপিক তৈরি:
Kafka টপিক তৈরি করতে আপনি Kafka সার্ভারের কমান্ড লাইন টুল ব্যবহার করতে পারেন:
kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1৩. Kafka কনজিউমার তৈরি করা (Consumer)
Kafka কনজিউমার একটি অ্যাপ্লিকেশন যা Kafka টপিক থেকে বার্তা গ্রহণ করে। নিচে একটি কনজিউমার তৈরি করা হয়েছে যা test-topic টপিক থেকে বার্তা নিয়ে আসে।
Kafka কনজিউমার কোড:
import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig}
import java.util.{Properties, ConsumerRecord}
import java.util.Collections
object KafkaConsumerExample {
def main(args: Array[String]): Unit = {
// Kafka কনজিউমার কনফিগারেশন
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") // Kafka সার্ভার
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group")
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
// Kafka কনজিউমার তৈরি করা
val consumer = new KafkaConsumer[String, String](props)
// টপিক সাবস্ক্রাইব করা
consumer.subscribe(Collections.singletonList("test-topic"))
// বার্তা পড়া
while (true) {
val records = consumer.poll(1000) // 1 সেকেন্ড পর্যন্ত অপেক্ষা করবে
for (record: ConsumerRecord[String, String] <- records.asScala) {
println(s"Consumed record: key = ${record.key}, value = ${record.value}, partition = ${record.partition}, offset = ${record.offset}")
}
}
}
}এখানে:
props.put: কনজিউমারের কনফিগারেশন সেট করা হয়েছে।consumer.subscribe: কনজিউমারকেtest-topicটপিক সাবস্ক্রাইব করানো হয়েছে।consumer.poll: বার্তা নিয়ে আসার জন্য ব্যবহৃত হয়।
কনজিউমার চলমান থাকলে:
- কনজিউমার প্রাপ্ত বার্তা স্ক্যান করবে এবং কনসোল এ প্রিন্ট করবে।
৪. ফিউচার এবং Kafka
স্কালার অ্যাসিনক্রোনাস প্রোগ্রামিংয়ের জন্য ফিউচার ব্যবহৃত হয়। Kafka প্রডিউসার এবং কনজিউমারকে অ্যাসিনক্রোনাসভাবে পরিচালনা করতে ফিউচার ব্যবহার করা যেতে পারে।
উদাহরণ: ফিউচার ব্যবহার করে Kafka প্রডিউসার
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
object KafkaProducerWithFuture {
def main(args: Array[String]): Unit = {
val props = new java.util.Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val future = Future {
val record = new ProducerRecord[String, String]("test-topic", "key", "Hello, Kafka with Future!")
producer.send(record)
println("Message sent successfully!")
}
// Wait for future completion
future.onComplete {
case scala.util.Success(_) => println("Message sent and future completed.")
case scala.util.Failure(e) => println(s"Failed: ${e.getMessage}")
}
// Producer close
producer.close()
}
}এখানে:
Futureব্যবহৃত হয়েছে যাতে প্রডিউসারের কাজ অ্যাসিনক্রোনাসভাবে করা যায় এবং ভবিষ্যতে তার ফলাফল পাওয়া যায়।
৫. Kafka Streams এবং Akka Streams
Play Framework এবং Akka Streams এর মাধ্যমে Kafka Streams ব্যবহৃত হতে পারে, যা স্কালায় ডাটা স্ট্রিমিং প্রক্রিয়াকরণের জন্য আরো সুবিধাজনক। এটি উচ্চ পারফরম্যান্স এবং স্ট্রিমিং ডাটা হ্যান্ডলিংয়ের জন্য ব্যবহৃত হয়।
সারাংশ
- Kafka প্রডিউসার: ডাটা Kafka টপিকে পাঠায়।
- Kafka কনজিউমার: Kafka টপিক থেকে ডাটা গ্রহণ করে।
- ফিউচার: Kafka প্রডিউসার বা কনজিউমারকে অ্যাসিনক্রোনাসভাবে পরিচালনা করার জন্য ফিউচার ব্যবহৃত হয়।
- Kafka Streams: এটি একটি উন্নত স্ট্রিমিং লাইব্রেরি, যা Kafka ডাটা প্রক্রিয়াকরণের জন্য ব্যবহৃত হয়।
Kafka স্কালায় ব্যবহার করতে হলে, Kafka ক্লায়েন্ট লাইব্রেরি এবং প্রপার কনফিগারেশন ব্যবহার করে ডাটা প্রেরণ এবং গ্রহণ করা যেতে পারে।