Skill

স্কালা ডিস্ট্রিবিউটেড সিস্টেম এবং মেসেজিং

স্কালা প্রোগ্রামিং (Scala Programming) - Computer Programming

251

স্কালা ডিস্ট্রিবিউটেড সিস্টেম এবং মেসেজিং ব্যবস্থাপনা তৈরি করার জন্য একটি শক্তিশালী ভাষা। এটি Akka এবং Apache Kafka এর মতো লাইব্রেরি এবং ফ্রেমওয়ার্ক সমর্থন করে, যা ডিস্ট্রিবিউটেড সিস্টেমের কনসেপ্টগুলো কার্যকরভাবে পরিচালনা করতে সাহায্য করে। এই টিউটোরিয়ালে, আমরা Akka এবং Kafka ব্যবহার করে স্কালায় ডিস্ট্রিবিউটেড সিস্টেম এবং মেসেজিং কনসেপ্ট নিয়ে আলোচনা করব।


১. Akka - Distributed Systems and Actor Model

Akka একটি উচ্চ কার্যক্ষম এবং স্কেলেবল ফ্রেমওয়ার্ক যা স্কালায় ডিস্ট্রিবিউটেড সিস্টেম এবং মেসেজিং পরিচালনার জন্য ব্যবহৃত হয়। Akka Actor Model ডিস্ট্রিবিউটেড সিস্টেমের জন্য খুবই উপযোগী, যেখানে বিভিন্ন অ্যাক্টর একে অপরের সাথে মেসেজ পাসের মাধ্যমে যোগাযোগ করে।

১.১ Akka Actor Model

Akka Actor Model মেসেজিং এবং কনকারেন্সি পরিচালনার জন্য এক্সট্রা-লাইটweight থ্রেড ব্যবহার করে অ্যাক্টর তৈরি এবং পরিচালনা করে। প্রতিটি অ্যাক্টর একটি একক থ্রেডে চলমান থাকে এবং অন্য অ্যাক্টরের সাথে মেসেজ পাস করে কাজ করে।

Akka Actor Example
import akka.actor.{Actor, ActorSystem, Props}

// Define Actor class
class MyActor extends Actor {
  def receive: Receive = {
    case "ping" => println("Pong")
    case _ => println("Unknown message")
  }
}

object AkkaExample {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("MyActorSystem")
    val myActor = system.actorOf(Props[MyActor], "myActor")

    myActor ! "ping"   // Send a message to the actor
    myActor ! "hello"  // Send an unknown message

    system.terminate() // Shutdown the ActorSystem
  }
}

এখানে:

  • Actor একটি সিস্টেমের একক ইউনিট, যা receive মেথডের মাধ্যমে মেসেজ হ্যান্ডল করে।
  • ActorSystem ব্যবহারের মাধ্যমে অ্যাক্টর তৈরি এবং পরিচালনা করা হচ্ছে।

১.২ Akka Cluster for Distributed Systems

Akka Cluster ব্যবহার করে একাধিক নোডের মধ্যে অ্যাক্টরস তৈরি এবং মেসেজ পাস করা যায়, যা একটি ডিস্ট্রিবিউটেড সিস্টেম তৈরি করে।

Akka Cluster Setup Example:

import akka.actor.Actor
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.actor.Props
import akka.actor.ActorSystem

class ClusterListener extends Actor {
  val cluster = Cluster(context.system)
  
  def receive = {
    case MemberUp(member) => 
      println(s"Member is Up: ${member.address}")
    case MemberRemoved(member, previousStatus) => 
      println(s"Member removed: ${member.address} with status $previousStatus")
  }
}

object AkkaClusterExample {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("ClusterSystem")
    val clusterListener = system.actorOf(Props[ClusterListener], name = "clusterListener")
    
    println("Cluster system started!")
  }
}

এখানে:

  • Akka Cluster ব্যবহৃত হয়েছে, যা মেসেজ পাস করার জন্য একাধিক নোডের মধ্যে অ্যাক্টরস নিয়ে একটি ডিস্ট্রিবিউটেড সিস্টেম তৈরি করে।
  • ClusterListener অ্যাক্টরটি সিস্টেমে যোগ হওয়া এবং বাদ পড়া মেম্বারস মনিটর করে।

২. Apache Kafka - Messaging and Distributed Data Streaming

Apache Kafka একটি ডিসট্রিবিউটেড স্ট্রিমিং প্ল্যাটফর্ম যা মূলত মেসেজ পাসিং, লগ সংগ্রহ, এবং ডেটা স্ট্রিমিংয়ের জন্য ব্যবহৃত হয়। এটি উচ্চ কার্যক্ষমতা, স্কেলেবিলিটি এবং প্রোটোকল সহজতার জন্য অত্যন্ত জনপ্রিয়।

২.১ Kafka Producer and Consumer

Kafka ক্লায়েন্টের মাধ্যমে ডেটা প্রযোজনা এবং গ্রহণ করা যায়। Producer ডেটা স্ট্রীম পাঠানোর কাজ করে এবং Consumer সেই ডেটা গ্রহণ করে।

Kafka Producer Example
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import java.util.Properties

object KafkaProducerExample {
  def main(args: Array[String]): Unit = {
    val props = new Properties()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    for (i <- 1 to 10) {
      val message = s"Message $i"
      val record = new ProducerRecord[String, String]("test-topic", "key", message)
      producer.send(record)
      println(s"Sent: $message")
    }

    producer.close()
  }
}

এখানে:

  • KafkaProducer ব্যবহার করে একটি মেসেজ test-topic টপিকে পাঠানো হয়েছে।
Kafka Consumer Example
import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig}
import java.util.{Properties, Collections}

object KafkaConsumerExample {
  def main(args: Array[String]): Unit = {
    val props = new Properties()
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")

    val consumer = new KafkaConsumer[String, String](props)
    consumer.subscribe(Collections.singletonList("test-topic"))

    while (true) {
      val records = consumer.poll(1000) // Poll every second
      records.forEach(record => println(s"Consumed: ${record.value()}"))
    }
  }
}

এখানে:

  • KafkaConsumer ব্যবহার করে test-topic থেকে মেসেজগুলো গ্রহণ করা হয়েছে।
  • consumer.poll() মেথডের মাধ্যমে কনজিউমার মেসেজ পেতে পোল করছে।

২.২ Kafka and Akka Integration

Akka এবং Kafka একসাথে ব্যবহৃত হতে পারে ডিস্ট্রিবিউটেড সিস্টেমে মেসেজ পাসিং এবং ডেটা স্ট্রিমিংয়ের জন্য। Akka Streams এবং Kafka একত্রে ব্যবহার করে একটি স্কেলেবল মেসেজিং সিস্টেম তৈরি করা যেতে পারে।

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.alpakka.kafka.scaladsl.Consumer
import akka.stream.alpakka.kafka.{ConsumerSettings, Subscriptions}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer

object AkkaKafkaIntegration {
  implicit val system = ActorSystem("AkkaKafkaIntegration")
  implicit val materializer = ActorMaterializer()

  val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
    .withBootstrapServers("localhost:9092")
    .withGroupId("test-group")

  def main(args: Array[String]): Unit = {
    val control = Consumer.plainSource(consumerSettings, Subscriptions.topics("test-topic"))
      .mapAsync(1)(msg => {
        println(s"Consumed message: ${msg.value()}")
        scala.concurrent.Future.successful(msg)
      })
      .toMat(akka.stream.Sink.ignore)(akka.stream.Materializer.materializer)
      .run()

    println("Consuming messages from Kafka")
  }
}

এখানে:

  • Akka Streams এবং Alpakka Kafka ব্যবহার করে Kafka Consumer তৈরি করা হয়েছে। এটি একে একে মেসেজগুলো গ্রহণ এবং প্রসেস করছে।

সারাংশ

স্কালায় ডিস্ট্রিবিউটেড সিস্টেম এবং মেসেজিং ব্যবস্থাপনা করার জন্য শক্তিশালী ফ্রেমওয়ার্ক এবং লাইব্রেরি রয়েছে। Akka এফেক্টিভ মেসেজ পাসিং এবং কনকারেন্সি সাপোর্টের জন্য ব্যবহার করা হয়, এবং Kafka ডিস্ট্রিবিউটেড ডেটা স্ট্রিমিং এবং মেসেজিং জন্য খুবই জনপ্রিয়। এই ফ্রেমওয়ার্কগুলো আপনাকে স্কেলেবল এবং পারফরম্যান্ট ডিস্ট্রিবিউটেড সিস্টেম এবং মেসেজিং সিস্টেম তৈরি করতে সাহায্য করে।

Content added By

Kafka একটি উচ্চ পারফরম্যান্স, স্কেলেবল, এবং ডিস্ট্রিবিউটেড স্ট্রিমিং প্ল্যাটফর্ম যা মূলত লগ ডাটা বা স্ট্রিমিং ডাটা প্রক্রিয়াকরণের জন্য ব্যবহৃত হয়। স্কালার মধ্যে Kafka ব্যবহার করতে, আপনাকে কিছু নির্দিষ্ট লায়ব্রেরি এবং ডিপেন্ডেন্সি যুক্ত করতে হবে, যেমন Apache Kafka এবং Akka Streams

এই গাইডে, আমরা স্কালায় Kafka ব্যবহার করার জন্য প্রয়োজনীয় স্টেপগুলি ব্যাখ্যা করব, যেমন Kafka প্রডিউসার (Producer) এবং কনজিউমার (Consumer) তৈরি করা এবং Kafka এর মাধ্যমে ডাটা প্রেরণ করা।


১. Kafka এবং sbt ডিপেন্ডেন্সি যুক্ত করা

স্কালায় Kafka ব্যবহার করতে, প্রথমে আপনাকে Kafka Client Library এর ডিপেন্ডেন্সি sbt (Scala Build Tool) ফাইলে যোগ করতে হবে।

sbt ফাইলে Kafka ডিপেন্ডেন্সি যুক্ত করা:

libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.8.0"
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.8.0"
  • kafka-clients: Kafka এর ক্লায়েন্ট লাইব্রেরি যা প্রডিউসার এবং কনজিউমারের জন্য ব্যবহৃত হয়।
  • kafka: Scala Kafka লাইব্রেরি।

২. Kafka প্রডিউসার তৈরি করা (Producer)

Kafka প্রডিউসার একটি অ্যাপ্লিকেশন যা ডাটা Kafka টপিকে পাঠায়। এখানে আমরা একটি প্রডিউসার তৈরি করব যা একটি টপিকে বার্তা প্রেরণ করবে।

Kafka প্রডিউসার কোড:

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import java.util.Properties

object KafkaProducerExample {
  def main(args: Array[String]): Unit = {

    // Kafka প্রডিউসার কনফিগারেশন
    val props = new Properties()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")  // Kafka সার্ভার
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

    // Kafka প্রডিউসার তৈরি করা
    val producer = new KafkaProducer[String, String](props)

    // Kafka টপিকের জন্য বার্তা পাঠানো
    val record = new ProducerRecord[String, String]("test-topic", "key", "Hello, Kafka!")
    producer.send(record)  // বার্তা পাঠানো
    println("Message sent successfully!")

    // প্রডিউসার বন্ধ করা
    producer.close()
  }
}

এখানে:

  • props.put: Kafka সার্ভারের ঠিকানা এবং সিরিয়ালাইজার সেট করা হয়েছে।
  • ProducerRecord: Kafka টপিক এবং বার্তা তৈরি করা হয়েছে।
  • producer.send(record): বার্তা Kafka টপিকে পাঠানো হয়েছে।

টপিক তৈরি:

Kafka টপিক তৈরি করতে আপনি Kafka সার্ভারের কমান্ড লাইন টুল ব্যবহার করতে পারেন:

kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

৩. Kafka কনজিউমার তৈরি করা (Consumer)

Kafka কনজিউমার একটি অ্যাপ্লিকেশন যা Kafka টপিক থেকে বার্তা গ্রহণ করে। নিচে একটি কনজিউমার তৈরি করা হয়েছে যা test-topic টপিক থেকে বার্তা নিয়ে আসে।

Kafka কনজিউমার কোড:

import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig}
import java.util.{Properties, ConsumerRecord}
import java.util.Collections

object KafkaConsumerExample {
  def main(args: Array[String]): Unit = {

    // Kafka কনজিউমার কনফিগারেশন
    val props = new Properties()
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")  // Kafka সার্ভার
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group")
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")

    // Kafka কনজিউমার তৈরি করা
    val consumer = new KafkaConsumer[String, String](props)

    // টপিক সাবস্ক্রাইব করা
    consumer.subscribe(Collections.singletonList("test-topic"))

    // বার্তা পড়া
    while (true) {
      val records = consumer.poll(1000)  // 1 সেকেন্ড পর্যন্ত অপেক্ষা করবে
      for (record: ConsumerRecord[String, String] <- records.asScala) {
        println(s"Consumed record: key = ${record.key}, value = ${record.value}, partition = ${record.partition}, offset = ${record.offset}")
      }
    }
  }
}

এখানে:

  • props.put: কনজিউমারের কনফিগারেশন সেট করা হয়েছে।
  • consumer.subscribe: কনজিউমারকে test-topic টপিক সাবস্ক্রাইব করানো হয়েছে।
  • consumer.poll: বার্তা নিয়ে আসার জন্য ব্যবহৃত হয়।

কনজিউমার চলমান থাকলে:

  • কনজিউমার প্রাপ্ত বার্তা স্ক্যান করবে এবং কনসোল এ প্রিন্ট করবে।

৪. ফিউচার এবং Kafka

স্কালার অ্যাসিনক্রোনাস প্রোগ্রামিংয়ের জন্য ফিউচার ব্যবহৃত হয়। Kafka প্রডিউসার এবং কনজিউমারকে অ্যাসিনক্রোনাসভাবে পরিচালনা করতে ফিউচার ব্যবহার করা যেতে পারে।

উদাহরণ: ফিউচার ব্যবহার করে Kafka প্রডিউসার

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

object KafkaProducerWithFuture {
  def main(args: Array[String]): Unit = {

    val props = new java.util.Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    val future = Future {
      val record = new ProducerRecord[String, String]("test-topic", "key", "Hello, Kafka with Future!")
      producer.send(record)
      println("Message sent successfully!")
    }

    // Wait for future completion
    future.onComplete {
      case scala.util.Success(_) => println("Message sent and future completed.")
      case scala.util.Failure(e) => println(s"Failed: ${e.getMessage}")
    }

    // Producer close
    producer.close()
  }
}

এখানে:

  • Future ব্যবহৃত হয়েছে যাতে প্রডিউসারের কাজ অ্যাসিনক্রোনাসভাবে করা যায় এবং ভবিষ্যতে তার ফলাফল পাওয়া যায়।

৫. Kafka Streams এবং Akka Streams

Play Framework এবং Akka Streams এর মাধ্যমে Kafka Streams ব্যবহৃত হতে পারে, যা স্কালায় ডাটা স্ট্রিমিং প্রক্রিয়াকরণের জন্য আরো সুবিধাজনক। এটি উচ্চ পারফরম্যান্স এবং স্ট্রিমিং ডাটা হ্যান্ডলিংয়ের জন্য ব্যবহৃত হয়।


সারাংশ

  • Kafka প্রডিউসার: ডাটা Kafka টপিকে পাঠায়।
  • Kafka কনজিউমার: Kafka টপিক থেকে ডাটা গ্রহণ করে।
  • ফিউচার: Kafka প্রডিউসার বা কনজিউমারকে অ্যাসিনক্রোনাসভাবে পরিচালনা করার জন্য ফিউচার ব্যবহৃত হয়।
  • Kafka Streams: এটি একটি উন্নত স্ট্রিমিং লাইব্রেরি, যা Kafka ডাটা প্রক্রিয়াকরণের জন্য ব্যবহৃত হয়।

Kafka স্কালায় ব্যবহার করতে হলে, Kafka ক্লায়েন্ট লাইব্রেরি এবং প্রপার কনফিগারেশন ব্যবহার করে ডাটা প্রেরণ এবং গ্রহণ করা যেতে পারে।

Content added By

Akka হল একটি শক্তিশালী ফ্রেমওয়ার্ক যা Actor Model ব্যবহার করে কনকারেন্ট, ডিসট্রিবিউটেড, এবং রেসিলিয়েন্ট অ্যাপ্লিকেশন তৈরি করতে ব্যবহৃত হয়। Akka Cluster এবং Akka Remote দুটি ফিচার প্রদান করে, যা ডিস্ট্রিবিউটেড অ্যাপ্লিকেশন তৈরির জন্য অত্যন্ত উপযোগী। এই দুটি ফিচার ব্যবহার করে আপনি আপনার অ্যাক্টরদের একাধিক নোডে বা মেশিনে স্থাপন করতে পারেন এবং তারা একে অপরের সাথে যোগাযোগ করতে পারে।

১. Akka Cluster

Akka Cluster একটি ডিসট্রিবিউটেড সিস্টেম তৈরি করতে সাহায্য করে, যেখানে একাধিক নোড বা মেশিনের মধ্যে অ্যাক্টররা একে অপরের সাথে যোগাযোগ করতে পারে। এটি failover এবং scalability সুবিধা প্রদান করে। Akka Cluster আপনাকে ডিস্ট্রিবিউটেড অ্যাপ্লিকেশন তৈরির জন্য নিম্নলিখিত সুবিধা প্রদান করে:

  • Clustering: একাধিক নোডের মধ্যে অ্যাক্টরগুলিকে যুক্ত করা।
  • Failover: এক নোড ব্যর্থ হলে অন্য নোড থেকে অ্যাক্টর পুনরুদ্ধার করা।
  • Eventual Consistency: নোডগুলির মধ্যে ডেটা সমন্বয় করা।
  • Shard: একাধিক নোডের মধ্যে অ্যাক্টরদের শার্ড করা (Shard-based routing)।

১.১ Akka Cluster কনফিগারেশন

Akka Cluster ব্যবহার করতে হলে আপনাকে application.conf ফাইলে কিছু কনফিগারেশন করতে হবে। এখানে একটি সাধারণ Akka Cluster কনফিগারেশন দেখানো হলো:

akka {
  actor {
    provider = "cluster"  // Enable Akka Cluster
  }
  
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2551
    }
  }
  
  cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551",
      "akka.tcp://ClusterSystem@127.0.0.1:2552"
    ]
    auto-down-unreachable-after = 10s
  }
}

এখানে:

  • hostname এবং port সেট করা হয়েছে, যা নোডের ঠিকানা নির্ধারণ করে।
  • seed-nodes হল সিড নোডগুলির তালিকা, যেগুলি ক্লাস্টারের অন্যান্য নোডগুলোকে জানায়।
  • auto-down-unreachable-after সেকেন্ডে সেট করা হয়েছে, যার মাধ্যমে একটি নোড যদি দীর্ঘ সময়ের জন্য পৌঁছাতে না পারে, তবে তা ডাউন হিসেবে চিহ্নিত হবে।

১.২ Akka Cluster কোড উদাহরণ

এখন আমরা একটি সিম্পল Akka Cluster উদাহরণ তৈরি করি যেখানে দুইটি নোড অ্যাক্টরদের মধ্যে যোগাযোগ করবে।

ClusterApp.scala:

import akka.actor.{Actor, ActorSystem, Props}
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.{MemberUp, MemberEvent}

class ClusterListener extends Actor {
  val cluster = Cluster(context.system)

  // Subscribe to cluster changes
  cluster.subscribe(self, classOf[MemberEvent])

  def receive = {
    case MemberUp(member) =>
      println(s"Member is Up: ${member.address}")
    case _ => // Ignore other messages
  }
}

object ClusterApp extends App {
  val system = ActorSystem("ClusterSystem")
  val listener = system.actorOf(Props[ClusterListener], name = "clusterListener")

  // Join the cluster
  val cluster = Cluster(system)
  cluster.join(cluster.selfAddress)

  // Keep the application running
  println("Cluster system is running.")
}

এখানে:

  • ClusterListener অ্যাক্টরটি ক্লাস্টারের সদস্যদের পরিবর্তন দেখবে এবং সদস্যদের MemberUp স্টেটাস মেসেজ গ্রহণ করবে।
  • cluster.join(cluster.selfAddress) এর মাধ্যমে নোডটি ক্লাস্টারে যোগদান করবে।

২. Akka Remote

Akka Remote আপনাকে আপনার অ্যাক্টরদের একাধিক মেশিনে বা নোডে চালানোর ক্ষমতা দেয় এবং তারা একে অপরের সাথে যোগাযোগ করতে পারে। Akka Remote ব্যবহৃত হয় যখন আপনি একাধিক অ্যাক্টর সিস্টেমকে আলাদা মেশিনে বা প্রক্রিয়ায় চালাতে চান।

২.১ Akka Remote কনফিগারেশন

Akka Remote ব্যবহার করার জন্য আপনাকে application.conf ফাইলে কনফিগারেশন করতে হবে।

akka {
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2551
    }
  }
}

এখানে, hostname এবং port সেট করা হয়েছে, যা নির্দিষ্ট করে দেয় যে এই অ্যাক্টর সিস্টেম কোন মেশিনে চলবে এবং কোন পোর্টে এটি অ্যাক্সেস করা যাবে।

২.২ Akka Remote উদাহরণ

এখন একটি উদাহরণ দেখানো হলো যেখানে Akka Remote ব্যবহার করা হচ্ছে দুটি আলাদা সিস্টেমের মধ্যে যোগাযোগের জন্য।

RemoteSender.scala (Sender Side):

import akka.actor.{Actor, ActorSystem, Props}
import akka.remote.RemoteScope

class RemoteActor extends Actor {
  def receive = {
    case msg: String => println(s"Received message: $msg")
  }
}

object RemoteSender extends App {
  val system = ActorSystem("RemoteSystem")
  val remoteActor = system.actorOf(Props[RemoteActor], name = "remoteActor")

  // Send a message to the remote actor
  remoteActor ! "Hello from remote system"
}

RemoteReceiver.scala (Receiver Side):

import akka.actor.{Actor, ActorSystem, Props}

class RemoteActor extends Actor {
  def receive = {
    case msg: String => println(s"Received message: $msg")
  }
}

object RemoteReceiver extends App {
  val system = ActorSystem("RemoteSystem")
  val remoteActor = system.actorOf(Props[RemoteActor], name = "remoteActor")

  // Wait for messages
}

এখানে:

  • RemoteSender অ্যাক্টরটি একটি মেসেজ "Hello from remote system" পাঠাচ্ছে।
  • RemoteReceiver অ্যাক্টরটি রিসিভ করে মেসেজটি প্রিন্ট করবে।

৩. Akka Cluster এবং Remote-এর সুবিধা

  • Scalability: Akka Cluster ব্যবহার করে আপনি সহজেই অ্যাপ্লিকেশন স্কেল করতে পারেন। একাধিক নোড যোগ করে আপনি আপনার সিস্টেমকে স্কেল করতে পারবেন।
  • Fault Tolerance: Akka Cluster অটোমেটিকালি ব্যর্থ নোডকে চিনতে পারে এবং সিস্টেম পুনরুদ্ধার করতে পারে।
  • High Availability: Akka Remote এবং Cluster ব্যবহার করে আপনার অ্যাপ্লিকেশন সর্বদা উপলব্ধ থাকবে, কারণ অ্যাক্টররা বিভিন্ন নোডের মধ্যে যোগাযোগ করতে সক্ষম।
  • Distributable Actors: আপনি আপনার অ্যাক্টরগুলোকে বিভিন্ন সিস্টেমে বিতরণ করতে পারবেন, যা আপনাকে বড় সিস্টেম ডিজাইন করতে সাহায্য করবে।

সারাংশ

  • Akka Cluster হল একটি ডিসট্রিবিউটেড সিস্টেম তৈরি করার জন্য ব্যবহৃত শক্তিশালী ফিচার যা একাধিক নোডের মধ্যে অ্যাক্টরদের মধ্যে যোগাযোগ করতে সক্ষম।
  • Akka Remote আপনাকে আপনার অ্যাক্টর সিস্টেমকে বিভিন্ন সিস্টেম বা নোডে রান করতে সক্ষম করে এবং তাদের মধ্যে যোগাযোগ স্থাপন করতে সাহায্য করে।
  • Akka এর মাধ্যমে আপনি কনকারেন্ট, ডিসট্রিবিউটেড, এবং স্কেলেবল সিস্টেম তৈরি করতে পারেন যা অত্যন্ত কার্যকর এবং উচ্চ পারফরম্যান্স প্রদান করে।

Akka Cluster এবং Remote ব্যবহারের মাধ্যমে আপনি একটি সিস্টেমের বিভিন্ন অংশে একাধিক অ্যাক্টর সিস্টেমকে সিঙ্ক্রোনাইজ করতে সক্ষম হবেন, যা বড় এবং স্কেলেবল অ্যাপ্লিকেশন তৈরি করতে সাহায্য করে।

Content added By

Event Sourcing এবং CQRS (Command Query Responsibility Segregation) হল দুটি শক্তিশালী আর্কিটেকচারাল প্যাটার্ন, যা একসাথে ব্যবহৃত হলে স্কালার সিস্টেমকে আরও স্কেলেবল, পারফরম্যান্স অপটিমাইজড, এবং রিঅ্যাকটিভ করতে সাহায্য করে। এই দুটি প্যাটার্ন রিঅ্যাকটিভ ডিস্ট্রিবিউটেড সিস্টেম ডিজাইনে জনপ্রিয়, যেখানে ডেটার স্টেট পরিবর্তন এবং পড়া আলাদা করা হয় এবং ডেটার ইতিহাস সংরক্ষিত থাকে।


১. Event Sourcing

Event Sourcing হল এমন একটি আর্কিটেকচারাল প্যাটার্ন, যেখানে অ্যাপ্লিকেশন স্টেট পরিবর্তন সরাসরি ডাটাবেসে সংরক্ষণ না করে ইভেন্ট হিসেবে সংরক্ষণ করা হয়। প্রতিটি স্টেট পরিবর্তন একটি ইভেন্ট হিসেবে ধরে রাখা হয় এবং যখনই বর্তমান স্টেট প্রয়োজন হয়, তখন পুরানো ইভেন্টগুলিকে পুনরায় খেলা (replay) করা হয়। এর মাধ্যমে সিস্টেমের স্টেটের পূর্ণ ইতিহাস ট্র্যাক করা যায় এবং কোন সময়ে স্টেট হারিয়ে গেলেও, পুরনো ইভেন্টগুলির মাধ্যমে সঠিক স্টেট পুনরুদ্ধার করা সম্ভব হয়।

১.১ Event Sourcing এর মূল উপাদান:

  • ইভেন্ট: এটি একটি কার্যকলাপ বা স্টেট পরিবর্তন যা সিস্টেমে ঘটেছে।
  • ইভেন্ট স্টোর: সমস্ত ইভেন্ট সংরক্ষিত থাকে। এটি ঐতিহাসিক ডেটা যা পুনরায় খেলা যেতে পারে।
  • স্টেট পুনঃপ্রতিষ্ঠা: ডেটার বর্তমান স্টেট ইভেন্টগুলির replay মাধ্যমে পুনরুদ্ধার করা হয়।

১.২ Event Sourcing উদাহরণ

sealed trait AccountEvent
case class Deposit(amount: Int) extends AccountEvent
case class Withdraw(amount: Int) extends AccountEvent

case class Account(id: String, balance: Int)

object EventSourcingExample {

  def handleEvent(account: Account, event: AccountEvent): Account = event match {
    case Deposit(amount) => account.copy(balance = account.balance + amount)
    case Withdraw(amount) => account.copy(balance = account.balance - amount)
  }

  def main(args: Array[String]): Unit = {
    var account = Account("123", 0)

    val events: List[AccountEvent] = List(Deposit(100), Withdraw(50), Deposit(200))

    events.foreach(event => account = handleEvent(account, event))

    println(account)  // Output: Account(123,250)
  }
}

এখানে:

  • Deposit এবং Withdraw ইভেন্টগুলো Account অবজেক্টের স্টেট পরিবর্তন করে।
  • ইভেন্টগুলো replay করে অ্যাকাউন্টের বর্তমান ব্যালেন্স পুনরুদ্ধার করা হয়েছে।

২. CQRS (Command Query Responsibility Segregation)

CQRS হল একটি আর্কিটেকচার প্যাটার্ন যেখানে সিস্টেমের Command এবং Query অপারেশনগুলোকে আলাদা করা হয়।

  • Command: সিস্টেমের স্টেট পরিবর্তন করা, যেমন ডেটা আপডেট বা ইনসার্ট করা।
  • Query: সিস্টেমের স্টেট পড়া, যেমন ডেটা ফেচ করা বা কুয়েরি চালানো।

CQRS প্যাটার্নে, Write Model এবং Read Model আলাদা রাখা হয়। Command অপারেশনগুলি স্টেট পরিবর্তন করে, এবং Query অপারেশনগুলি শুধুমাত্র ডেটা পড়ে, স্টেট পরিবর্তন না করে।

২.১ CQRS এর সুবিধা:

  • পারফরম্যান্স অপটিমাইজেশন: কুয়েরি এবং কমান্ড অপারেশন আলাদা করার মাধ্যমে পারফরম্যান্স উন্নত করা যায়। কুয়েরি অপারেশনগুলিকে অপটিমাইজড এবং ফাস্ট করা যায়।
  • স্কেলেবিলিটি: পৃথক কুয়েরি এবং কমান্ড মডেল স্কেলিং সহজ করে তোলে, যেমন ডেটাবেসে পৃথক ইনডেক্সিং বা ডেটাবেস শার্ডিং।
  • সাধারণ সিস্টেমের কার্যকারিতা বৃদ্ধি: একে অপরের ওপর নির্ভরশীল না হওয়ার কারণে সিস্টেমের লোড সঠিকভাবে পরিচালনা করা যায়।

২.২ CQRS উদাহরণ

// Command Side - Write Model
case class Command(id: String, amount: Int)

class CommandHandler {
  def handleCommand(command: Command): String = {
    // Process command and perform write operation
    s"Processed command with id: ${command.id} and amount: ${command.amount}"
  }
}

// Query Side - Read Model
case class Query(id: String)

class QueryHandler {
  def handleQuery(query: Query): String = {
    // Process query and fetch data
    s"Fetched data for query with id: ${query.id}"
  }
}

object CQRSExample {
  def main(args: Array[String]): Unit = {
    val commandHandler = new CommandHandler
    val queryHandler = new QueryHandler

    // Handling command
    val commandResult = commandHandler.handleCommand(Command("123", 100))
    println(commandResult)  // Output: Processed command with id: 123 and amount: 100

    // Handling query
    val queryResult = queryHandler.handleQuery(Query("123"))
    println(queryResult)  // Output: Fetched data for query with id: 123
  }
}

এখানে:

  • CommandHandler সিস্টেমের স্টেট পরিবর্তন করে, যেমন ডেটা ইনসার্ট বা আপডেট।
  • QueryHandler শুধুমাত্র ডেটা পড়ার জন্য ব্যবহৃত হয়, ডেটা পরিবর্তন নয়।

৩. Event Sourcing এবং CQRS একসাথে ব্যবহৃত হলে

এখন যদি Event Sourcing এবং CQRS একসাথে ব্যবহৃত হয়, তবে আপনি সিস্টেমের স্টেট পরিবর্তন ইভেন্ট হিসেবে সংরক্ষণ করবেন এবং সেই ইভেন্টগুলো replay করে স্টেট পুনরুদ্ধার করবেন। সিস্টেমের Command এবং Query অপারেশন আলাদা করা হবে, যা পারফরম্যান্স এবং স্কেলেবিলিটি আরও উন্নত করবে।

৩.১ Event Sourcing এবং CQRS একসাথে উদাহরণ

// Event Sourcing with CQRS

case class OrderCreated(id: String, item: String, quantity: Int)

class CommandHandler {
  def handleCreateOrder(order: OrderCreated): Unit = {
    // Save the event to event store
    println(s"Saving event: OrderCreated(${order.id}, ${order.item}, ${order.quantity})")
  }
}

class QueryHandler {
  def getOrder(id: String): String = {
    // Fetch order details from read model
    s"Fetching order details for id: $id"
  }
}

object EventSourcingCQRSExample {
  def main(args: Array[String]): Unit = {
    val commandHandler = new CommandHandler
    val queryHandler = new QueryHandler

    // Command: Create an order
    val order = OrderCreated("123", "Laptop", 1)
    commandHandler.handleCreateOrder(order)

    // Query: Fetch the order
    val orderDetails = queryHandler.getOrder("123")
    println(orderDetails)  // Output: Fetching order details for id: 123
  }
}

এখানে:

  • CommandHandler ইভেন্টটি ডাটাবেসে সংরক্ষণ করে (এটি বাস্তবে ইভেন্ট স্টোর হতে পারে)।
  • QueryHandler ডেটা থেকে বিস্তারিত প্রাপ্তি করে, যা আলাদা read model থেকে এসেছে।

সারাংশ

  • Event Sourcing হল একটি প্যাটার্ন যেখানে সমস্ত স্টেট পরিবর্তন ইভেন্ট হিসেবে সংরক্ষণ করা হয়, এবং এই ইভেন্টগুলি replay করে স্টেট পুনরুদ্ধার করা হয়।
  • CQRS হল একটি প্যাটার্ন যেখানে Command (স্টেট পরিবর্তন) এবং Query (স্টেট পড়া) আলাদা করে দেওয়া হয়, যা পারফরম্যান্স এবং স্কেলেবিলিটি উন্নত করে।
  • Event Sourcing এবং CQRS একসাথে ব্যবহৃত হলে, আপনি একটি সিস্টেম ডিজাইন করতে পারেন যা দ্রুত এবং স্কেলেবল, এবং এর ডেটার ইতিহাস সংরক্ষণ এবং পুনরুদ্ধার করা সম্ভব।
Content added By

স্কালা মেসেজ পাসিং সিস্টেম সাধারণত Akka লাইব্রেরি বা স্কালার অ্যাক্টর মডেল (Actor Model) ব্যবহার করে তৈরি করা হয়। মেসেজ পাসিং সিস্টেম হল একটি ডিস্ট্রিবিউটেড বা কনকারেন্ট সিস্টেম যেখানে বিভিন্ন অ্যাক্টর বা কম্পোনেন্ট একে অপরের মধ্যে মেসেজ পাস করে, এবং প্রতিটি অ্যাক্টর তাদের নিজস্ব স্টেট এবং কার্যক্ষমতা বজায় রেখে কাজ করে। এটি অ্যাক্টর মডেল-এর মূল ধারণা, যেখানে মেসেজ পাসিংয়ের মাধ্যমে অ্যাক্টররা যোগাযোগ করে এবং একে অপরের অবস্থান বা স্টেটকে পরিবর্তন না করে একে অপরের সাথে সমন্বয় করে কাজ করে।

স্কালায় Akka লাইব্রেরি এবং Akka Actor Model ব্যবহারের মাধ্যমে মেসেজ পাসিং সিস্টেম তৈরি করা সহজ এবং কার্যকরী হয়।

এই লেখায় আমরা Akka Actor Model এর মাধ্যমে স্কালায় মেসেজ পাসিং সিস্টেমের কিভাবে কাজ করে তা উদাহরণসহ জানবো।


১. Akka Actor Model

Akka Actor Model ব্যবহার করে স্কালায় মেসেজ পাসিং সিস্টেম তৈরি করা হয়, যেখানে অ্যাক্টর একটি ইনস্ট্যান্স হিসেবে কাজ করে এবং এটি মেসেজ গ্রহণ করে তার স্টেট পরিবর্তন করতে পারে। একটি অ্যাক্টর শুধুমাত্র নিজেই মেসেজ হ্যান্ডল করতে সক্ষম এবং অন্যান্য অ্যাক্টরের স্টেট পরিবর্তন করতে পারে না।

মূল ধারণা:

  • অ্যাক্টর: একটি অ্যাক্টর হল একটি স্বতন্ত্র ইউনিট যা ইনপুট (মেসেজ) গ্রহণ করে এবং তার স্টেট পরিবর্তন করে। অ্যাক্টরের জন্য মূল কার্যকলাপ হল মেসেজ হ্যান্ডলিং।
  • মেসেজ পাসিং: অ্যাক্টররা একে অপরকে মেসেজ পাস করে এবং পরবর্তীতে তাদের কাজ চালিয়ে যায়। মেসেজ একবার পাস করার পর, একে অন্য অ্যাক্টরের স্টেটের উপর কোনো প্রভাব পড়ে না, বরং প্রক্রিয়া একে অপরের মধ্যে সম্পূর্ণ বিচ্ছিন্নভাবে ঘটে।

২. Akka Actor Setup এবং ব্যবহার

এখানে একটি সাধারণ Akka অ্যাক্টর সেটআপ এবং মেসেজ পাসিং সিস্টেমের উদাহরণ দেয়া হলো।

২.১ Akka Actor Dependency যোগ করা

প্রথমে, আপনার build.sbt ফাইলে Akka Actor লাইব্রেরি যোগ করুন:

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.6.10"
libraryDependencies += "com.typesafe.akka" %% "akka-testkit" % "2.6.10" % Test

২.২ Akka Actor উদাহরণ

এই উদাহরণে, আমরা একটি Greeter অ্যাক্টর তৈরি করব যা একটি গ্রীটিং মেসেজ গ্রহণ করবে এবং তারপরে এটি প্রতিক্রিয়া হিসেবে মেসেজ পাঠাবে।

import akka.actor.{Actor, ActorSystem, Props}

// Greeter actor
class Greeter extends Actor {
  def receive = {
    case "greet" => 
      println("Hello, Actor Model!")
      sender() ! "greeting sent"  // Responding with a message
    case _ => 
      println("Unknown message")
  }
}

object AkkaActorExample {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("actor-system")
    
    // Create an instance of Greeter actor
    val greeter = system.actorOf(Props[Greeter], "greeter-actor")
    
    // Send a "greet" message to the Greeter actor
    greeter ! "greet"
    
    // Send a message that the actor does not understand
    greeter ! "unknown"
    
    // Stop the actor system gracefully
    system.terminate()
  }
}

এখানে:

  • Greeter ক্লাস হল একটি actor যা একটি receive মেথড দিয়ে মেসেজ গ্রহণ করে এবং উপযুক্ত প্রতিক্রিয়া প্রদান করে।
  • actorOf মেথড ব্যবহার করে অ্যাক্টর তৈরি করা হয়েছে।
  • greeter ! "greet": এই লাইনটি মেসেজ "greet" অ্যাক্টরের কাছে পাঠাচ্ছে।

২.৩ মেসেজ পাসিং

Akka-তে মেসেজ পাসিং খুবই সরল। আপনি শুধুমাত্র অ্যাক্টরের নাম এবং মেসেজ পাঠালেই, অ্যাক্টর তা গ্রহণ করে প্রক্রিয়া করবে। এটি এস্কেপাল টাইম এর মধ্যে হয়, এবং কখনও একাধিক অ্যাক্টর একে অপরকে মেসেজ পাঠায়।

এখানে:

  • sender(): এটি অ্যাক্টরের মাধ্যমে বর্তমান sender-এর রেফারেন্স গ্রহণ করে, যাতে প্রতিক্রিয়া পাঠানো যায়।
  • ! (অ্যাক্টর চিহ্ন) দিয়ে মেসেজ পাঠানো হয়।

৩. Akka মেসেজ পাসিং অ্যাপ্লিকেশন উদাহরণ

ধরা যাক, আমাদের দুটি অ্যাক্টর রয়েছে:

  1. SenderActor: এটি মেসেজ পাঠানোর দায়িত্ব পালন করবে।
  2. ReceiverActor: এটি মেসেজ গ্রহণ করবে এবং প্রক্রিয়া করবে।
import akka.actor.{Actor, ActorSystem, Props}

// Sender Actor
class SenderActor(receiver: akka.actor.ActorRef) extends Actor {
  def receive = {
    case "start" => 
      println("Sender sending message to Receiver")
      receiver ! "Hello, Receiver!"
  }
}

// Receiver Actor
class ReceiverActor extends Actor {
  def receive = {
    case msg: String =>
      println(s"Receiver received message: $msg")
  }
}

object AkkaMessagePassing {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("Message-Passing-System")
    
    // Create Receiver actor
    val receiver = system.actorOf(Props[ReceiverActor], "receiver")
    
    // Create Sender actor
    val sender = system.actorOf(Props(new SenderActor(receiver)), "sender")
    
    // Start message passing
    sender ! "start"
    
    // Gracefully terminate the system
    system.terminate()
  }
}

এখানে:

  • SenderActor মেসেজ পাঠায় এবং ReceiverActor মেসেজ গ্রহণ করে এবং প্রিন্ট করে।
  • receiver ! "Hello, Receiver!": Sender actor থেকে Receiver actor-এ মেসেজ পাঠানো হচ্ছে।

Output:

Sender sending message to Receiver
Receiver received message: Hello, Receiver!

৪. Akka মেসেজ পাসিং এবং আন্ডারলাইন

  • অ্যাক্টর মডেল স্কালায় মেসেজ পাসিং ব্যবস্থার মাধ্যমে কনকারেন্ট প্রোগ্রামিং খুব সহজ করে তোলে।
  • একাধিক অ্যাক্টর একে অপরকে মেসেজ পাঠানোর মাধ্যমে ডিস্ট্রিবিউটেড সিস্টেম তৈরি করা যায় এবং অ্যাক্টরের মধ্যে মেসেজ সিঙ্ক্রোনাস এবং অ্যাসিঙ্ক্রোনাসভাবে পাস করা যায়।

৫. মেসেজ পাসিং সিস্টেমের সুবিধা

  • ডিস্ট্রিবিউটেড এবং কনকারেন্ট প্রোগ্রামিং: মেসেজ পাসিং সিস্টেমের মাধ্যমে একাধিক অ্যাক্টর একে অপরের সাথে পারফেক্টলি কমিউনিকেশন করতে পারে, যা ডিস্ট্রিবিউটেড সিস্টেম এবং কনকারেন্ট প্রোগ্রামিংয়ে সহায়তা করে।
  • ডিকাপলিং: অ্যাক্টররা একে অপরের স্টেটের সাথে সরাসরি যোগাযোগ না করে মেসেজ পাস করে, ফলে কোডের গঠন সহজ হয় এবং কমপ্লেক্সিটি কমে।
  • স্কেলেবিলিটি: মেসেজ পাসিং সিস্টেমে নতুন অ্যাক্টর যোগ করা এবং স্কেল করা সহজ, যা ডিস্ট্রিবিউটেড সিস্টেমের জন্য উপযোগী।

সারাংশ

Akka Actor Model এবং Akka HTTP স্কালায় মেসেজ পাসিং সিস্টেমের জন্য অত্যন্ত শক্তিশালী ফ্রেমওয়ার্ক, যা ডিস্ট্রিবিউটেড সিস্টেম এবং কনকারেন্ট প্রোগ্রামিং তৈরিতে সহায়ক। Akka অ্যাক্টর মডেল ব্যবহার করে মেসেজ পাসিং সিস্টেম তৈরি করে কোডকে আরও নমনীয়, ডিকাপলড, এবং স্কেলেবল করা যায়।

Content added By
Promotion

Are you sure to start over?

Loading...