স্কালায় Kafka ব্যবহার

স্কালা ডিস্ট্রিবিউটেড সিস্টেম এবং মেসেজিং - স্কালা প্রোগ্রামিং (Scala Programming) - Computer Programming

209

Kafka একটি উচ্চ পারফরম্যান্স, স্কেলেবল, এবং ডিস্ট্রিবিউটেড স্ট্রিমিং প্ল্যাটফর্ম যা মূলত লগ ডাটা বা স্ট্রিমিং ডাটা প্রক্রিয়াকরণের জন্য ব্যবহৃত হয়। স্কালার মধ্যে Kafka ব্যবহার করতে, আপনাকে কিছু নির্দিষ্ট লায়ব্রেরি এবং ডিপেন্ডেন্সি যুক্ত করতে হবে, যেমন Apache Kafka এবং Akka Streams

এই গাইডে, আমরা স্কালায় Kafka ব্যবহার করার জন্য প্রয়োজনীয় স্টেপগুলি ব্যাখ্যা করব, যেমন Kafka প্রডিউসার (Producer) এবং কনজিউমার (Consumer) তৈরি করা এবং Kafka এর মাধ্যমে ডাটা প্রেরণ করা।


১. Kafka এবং sbt ডিপেন্ডেন্সি যুক্ত করা

স্কালায় Kafka ব্যবহার করতে, প্রথমে আপনাকে Kafka Client Library এর ডিপেন্ডেন্সি sbt (Scala Build Tool) ফাইলে যোগ করতে হবে।

sbt ফাইলে Kafka ডিপেন্ডেন্সি যুক্ত করা:

libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.8.0"
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.8.0"
  • kafka-clients: Kafka এর ক্লায়েন্ট লাইব্রেরি যা প্রডিউসার এবং কনজিউমারের জন্য ব্যবহৃত হয়।
  • kafka: Scala Kafka লাইব্রেরি।

২. Kafka প্রডিউসার তৈরি করা (Producer)

Kafka প্রডিউসার একটি অ্যাপ্লিকেশন যা ডাটা Kafka টপিকে পাঠায়। এখানে আমরা একটি প্রডিউসার তৈরি করব যা একটি টপিকে বার্তা প্রেরণ করবে।

Kafka প্রডিউসার কোড:

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import java.util.Properties

object KafkaProducerExample {
  def main(args: Array[String]): Unit = {

    // Kafka প্রডিউসার কনফিগারেশন
    val props = new Properties()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")  // Kafka সার্ভার
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

    // Kafka প্রডিউসার তৈরি করা
    val producer = new KafkaProducer[String, String](props)

    // Kafka টপিকের জন্য বার্তা পাঠানো
    val record = new ProducerRecord[String, String]("test-topic", "key", "Hello, Kafka!")
    producer.send(record)  // বার্তা পাঠানো
    println("Message sent successfully!")

    // প্রডিউসার বন্ধ করা
    producer.close()
  }
}

এখানে:

  • props.put: Kafka সার্ভারের ঠিকানা এবং সিরিয়ালাইজার সেট করা হয়েছে।
  • ProducerRecord: Kafka টপিক এবং বার্তা তৈরি করা হয়েছে।
  • producer.send(record): বার্তা Kafka টপিকে পাঠানো হয়েছে।

টপিক তৈরি:

Kafka টপিক তৈরি করতে আপনি Kafka সার্ভারের কমান্ড লাইন টুল ব্যবহার করতে পারেন:

kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

৩. Kafka কনজিউমার তৈরি করা (Consumer)

Kafka কনজিউমার একটি অ্যাপ্লিকেশন যা Kafka টপিক থেকে বার্তা গ্রহণ করে। নিচে একটি কনজিউমার তৈরি করা হয়েছে যা test-topic টপিক থেকে বার্তা নিয়ে আসে।

Kafka কনজিউমার কোড:

import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig}
import java.util.{Properties, ConsumerRecord}
import java.util.Collections

object KafkaConsumerExample {
  def main(args: Array[String]): Unit = {

    // Kafka কনজিউমার কনফিগারেশন
    val props = new Properties()
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")  // Kafka সার্ভার
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group")
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")

    // Kafka কনজিউমার তৈরি করা
    val consumer = new KafkaConsumer[String, String](props)

    // টপিক সাবস্ক্রাইব করা
    consumer.subscribe(Collections.singletonList("test-topic"))

    // বার্তা পড়া
    while (true) {
      val records = consumer.poll(1000)  // 1 সেকেন্ড পর্যন্ত অপেক্ষা করবে
      for (record: ConsumerRecord[String, String] <- records.asScala) {
        println(s"Consumed record: key = ${record.key}, value = ${record.value}, partition = ${record.partition}, offset = ${record.offset}")
      }
    }
  }
}

এখানে:

  • props.put: কনজিউমারের কনফিগারেশন সেট করা হয়েছে।
  • consumer.subscribe: কনজিউমারকে test-topic টপিক সাবস্ক্রাইব করানো হয়েছে।
  • consumer.poll: বার্তা নিয়ে আসার জন্য ব্যবহৃত হয়।

কনজিউমার চলমান থাকলে:

  • কনজিউমার প্রাপ্ত বার্তা স্ক্যান করবে এবং কনসোল এ প্রিন্ট করবে।

৪. ফিউচার এবং Kafka

স্কালার অ্যাসিনক্রোনাস প্রোগ্রামিংয়ের জন্য ফিউচার ব্যবহৃত হয়। Kafka প্রডিউসার এবং কনজিউমারকে অ্যাসিনক্রোনাসভাবে পরিচালনা করতে ফিউচার ব্যবহার করা যেতে পারে।

উদাহরণ: ফিউচার ব্যবহার করে Kafka প্রডিউসার

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

object KafkaProducerWithFuture {
  def main(args: Array[String]): Unit = {

    val props = new java.util.Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    val future = Future {
      val record = new ProducerRecord[String, String]("test-topic", "key", "Hello, Kafka with Future!")
      producer.send(record)
      println("Message sent successfully!")
    }

    // Wait for future completion
    future.onComplete {
      case scala.util.Success(_) => println("Message sent and future completed.")
      case scala.util.Failure(e) => println(s"Failed: ${e.getMessage}")
    }

    // Producer close
    producer.close()
  }
}

এখানে:

  • Future ব্যবহৃত হয়েছে যাতে প্রডিউসারের কাজ অ্যাসিনক্রোনাসভাবে করা যায় এবং ভবিষ্যতে তার ফলাফল পাওয়া যায়।

৫. Kafka Streams এবং Akka Streams

Play Framework এবং Akka Streams এর মাধ্যমে Kafka Streams ব্যবহৃত হতে পারে, যা স্কালায় ডাটা স্ট্রিমিং প্রক্রিয়াকরণের জন্য আরো সুবিধাজনক। এটি উচ্চ পারফরম্যান্স এবং স্ট্রিমিং ডাটা হ্যান্ডলিংয়ের জন্য ব্যবহৃত হয়।


সারাংশ

  • Kafka প্রডিউসার: ডাটা Kafka টপিকে পাঠায়।
  • Kafka কনজিউমার: Kafka টপিক থেকে ডাটা গ্রহণ করে।
  • ফিউচার: Kafka প্রডিউসার বা কনজিউমারকে অ্যাসিনক্রোনাসভাবে পরিচালনা করার জন্য ফিউচার ব্যবহৃত হয়।
  • Kafka Streams: এটি একটি উন্নত স্ট্রিমিং লাইব্রেরি, যা Kafka ডাটা প্রক্রিয়াকরণের জন্য ব্যবহৃত হয়।

Kafka স্কালায় ব্যবহার করতে হলে, Kafka ক্লায়েন্ট লাইব্রেরি এবং প্রপার কনফিগারেশন ব্যবহার করে ডাটা প্রেরণ এবং গ্রহণ করা যেতে পারে।

Content added By
Promotion

Are you sure to start over?

Loading...