স্কালা ডিস্ট্রিবিউটেড সিস্টেম এবং মেসেজিং ব্যবস্থাপনা তৈরি করার জন্য একটি শক্তিশালী ভাষা। এটি Akka এবং Apache Kafka এর মতো লাইব্রেরি এবং ফ্রেমওয়ার্ক সমর্থন করে, যা ডিস্ট্রিবিউটেড সিস্টেমের কনসেপ্টগুলো কার্যকরভাবে পরিচালনা করতে সাহায্য করে। এই টিউটোরিয়ালে, আমরা Akka এবং Kafka ব্যবহার করে স্কালায় ডিস্ট্রিবিউটেড সিস্টেম এবং মেসেজিং কনসেপ্ট নিয়ে আলোচনা করব।
১. Akka - Distributed Systems and Actor Model
Akka একটি উচ্চ কার্যক্ষম এবং স্কেলেবল ফ্রেমওয়ার্ক যা স্কালায় ডিস্ট্রিবিউটেড সিস্টেম এবং মেসেজিং পরিচালনার জন্য ব্যবহৃত হয়। Akka Actor Model ডিস্ট্রিবিউটেড সিস্টেমের জন্য খুবই উপযোগী, যেখানে বিভিন্ন অ্যাক্টর একে অপরের সাথে মেসেজ পাসের মাধ্যমে যোগাযোগ করে।
১.১ Akka Actor Model
Akka Actor Model মেসেজিং এবং কনকারেন্সি পরিচালনার জন্য এক্সট্রা-লাইটweight থ্রেড ব্যবহার করে অ্যাক্টর তৈরি এবং পরিচালনা করে। প্রতিটি অ্যাক্টর একটি একক থ্রেডে চলমান থাকে এবং অন্য অ্যাক্টরের সাথে মেসেজ পাস করে কাজ করে।
Akka Actor Example
import akka.actor.{Actor, ActorSystem, Props}
// Define Actor class
class MyActor extends Actor {
def receive: Receive = {
case "ping" => println("Pong")
case _ => println("Unknown message")
}
}
object AkkaExample {
def main(args: Array[String]): Unit = {
val system = ActorSystem("MyActorSystem")
val myActor = system.actorOf(Props[MyActor], "myActor")
myActor ! "ping" // Send a message to the actor
myActor ! "hello" // Send an unknown message
system.terminate() // Shutdown the ActorSystem
}
}এখানে:
- Actor একটি সিস্টেমের একক ইউনিট, যা
receiveমেথডের মাধ্যমে মেসেজ হ্যান্ডল করে। - ActorSystem ব্যবহারের মাধ্যমে অ্যাক্টর তৈরি এবং পরিচালনা করা হচ্ছে।
১.২ Akka Cluster for Distributed Systems
Akka Cluster ব্যবহার করে একাধিক নোডের মধ্যে অ্যাক্টরস তৈরি এবং মেসেজ পাস করা যায়, যা একটি ডিস্ট্রিবিউটেড সিস্টেম তৈরি করে।
Akka Cluster Setup Example:
import akka.actor.Actor
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.actor.Props
import akka.actor.ActorSystem
class ClusterListener extends Actor {
val cluster = Cluster(context.system)
def receive = {
case MemberUp(member) =>
println(s"Member is Up: ${member.address}")
case MemberRemoved(member, previousStatus) =>
println(s"Member removed: ${member.address} with status $previousStatus")
}
}
object AkkaClusterExample {
def main(args: Array[String]): Unit = {
val system = ActorSystem("ClusterSystem")
val clusterListener = system.actorOf(Props[ClusterListener], name = "clusterListener")
println("Cluster system started!")
}
}এখানে:
- Akka Cluster ব্যবহৃত হয়েছে, যা মেসেজ পাস করার জন্য একাধিক নোডের মধ্যে অ্যাক্টরস নিয়ে একটি ডিস্ট্রিবিউটেড সিস্টেম তৈরি করে।
- ClusterListener অ্যাক্টরটি সিস্টেমে যোগ হওয়া এবং বাদ পড়া মেম্বারস মনিটর করে।
২. Apache Kafka - Messaging and Distributed Data Streaming
Apache Kafka একটি ডিসট্রিবিউটেড স্ট্রিমিং প্ল্যাটফর্ম যা মূলত মেসেজ পাসিং, লগ সংগ্রহ, এবং ডেটা স্ট্রিমিংয়ের জন্য ব্যবহৃত হয়। এটি উচ্চ কার্যক্ষমতা, স্কেলেবিলিটি এবং প্রোটোকল সহজতার জন্য অত্যন্ত জনপ্রিয়।
২.১ Kafka Producer and Consumer
Kafka ক্লায়েন্টের মাধ্যমে ডেটা প্রযোজনা এবং গ্রহণ করা যায়। Producer ডেটা স্ট্রীম পাঠানোর কাজ করে এবং Consumer সেই ডেটা গ্রহণ করে।
Kafka Producer Example
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import java.util.Properties
object KafkaProducerExample {
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
for (i <- 1 to 10) {
val message = s"Message $i"
val record = new ProducerRecord[String, String]("test-topic", "key", message)
producer.send(record)
println(s"Sent: $message")
}
producer.close()
}
}এখানে:
- KafkaProducer ব্যবহার করে একটি মেসেজ test-topic টপিকে পাঠানো হয়েছে।
Kafka Consumer Example
import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig}
import java.util.{Properties, Collections}
object KafkaConsumerExample {
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(Collections.singletonList("test-topic"))
while (true) {
val records = consumer.poll(1000) // Poll every second
records.forEach(record => println(s"Consumed: ${record.value()}"))
}
}
}এখানে:
- KafkaConsumer ব্যবহার করে test-topic থেকে মেসেজগুলো গ্রহণ করা হয়েছে।
consumer.poll()মেথডের মাধ্যমে কনজিউমার মেসেজ পেতে পোল করছে।
২.২ Kafka and Akka Integration
Akka এবং Kafka একসাথে ব্যবহৃত হতে পারে ডিস্ট্রিবিউটেড সিস্টেমে মেসেজ পাসিং এবং ডেটা স্ট্রিমিংয়ের জন্য। Akka Streams এবং Kafka একত্রে ব্যবহার করে একটি স্কেলেবল মেসেজিং সিস্টেম তৈরি করা যেতে পারে।
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.alpakka.kafka.scaladsl.Consumer
import akka.stream.alpakka.kafka.{ConsumerSettings, Subscriptions}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
object AkkaKafkaIntegration {
implicit val system = ActorSystem("AkkaKafkaIntegration")
implicit val materializer = ActorMaterializer()
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("test-group")
def main(args: Array[String]): Unit = {
val control = Consumer.plainSource(consumerSettings, Subscriptions.topics("test-topic"))
.mapAsync(1)(msg => {
println(s"Consumed message: ${msg.value()}")
scala.concurrent.Future.successful(msg)
})
.toMat(akka.stream.Sink.ignore)(akka.stream.Materializer.materializer)
.run()
println("Consuming messages from Kafka")
}
}এখানে:
- Akka Streams এবং Alpakka Kafka ব্যবহার করে Kafka Consumer তৈরি করা হয়েছে। এটি একে একে মেসেজগুলো গ্রহণ এবং প্রসেস করছে।
সারাংশ
স্কালায় ডিস্ট্রিবিউটেড সিস্টেম এবং মেসেজিং ব্যবস্থাপনা করার জন্য শক্তিশালী ফ্রেমওয়ার্ক এবং লাইব্রেরি রয়েছে। Akka এফেক্টিভ মেসেজ পাসিং এবং কনকারেন্সি সাপোর্টের জন্য ব্যবহার করা হয়, এবং Kafka ডিস্ট্রিবিউটেড ডেটা স্ট্রিমিং এবং মেসেজিং জন্য খুবই জনপ্রিয়। এই ফ্রেমওয়ার্কগুলো আপনাকে স্কেলেবল এবং পারফরম্যান্ট ডিস্ট্রিবিউটেড সিস্টেম এবং মেসেজিং সিস্টেম তৈরি করতে সাহায্য করে।
Kafka একটি উচ্চ পারফরম্যান্স, স্কেলেবল, এবং ডিস্ট্রিবিউটেড স্ট্রিমিং প্ল্যাটফর্ম যা মূলত লগ ডাটা বা স্ট্রিমিং ডাটা প্রক্রিয়াকরণের জন্য ব্যবহৃত হয়। স্কালার মধ্যে Kafka ব্যবহার করতে, আপনাকে কিছু নির্দিষ্ট লায়ব্রেরি এবং ডিপেন্ডেন্সি যুক্ত করতে হবে, যেমন Apache Kafka এবং Akka Streams।
এই গাইডে, আমরা স্কালায় Kafka ব্যবহার করার জন্য প্রয়োজনীয় স্টেপগুলি ব্যাখ্যা করব, যেমন Kafka প্রডিউসার (Producer) এবং কনজিউমার (Consumer) তৈরি করা এবং Kafka এর মাধ্যমে ডাটা প্রেরণ করা।
১. Kafka এবং sbt ডিপেন্ডেন্সি যুক্ত করা
স্কালায় Kafka ব্যবহার করতে, প্রথমে আপনাকে Kafka Client Library এর ডিপেন্ডেন্সি sbt (Scala Build Tool) ফাইলে যোগ করতে হবে।
sbt ফাইলে Kafka ডিপেন্ডেন্সি যুক্ত করা:
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.8.0"
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.8.0"kafka-clients: Kafka এর ক্লায়েন্ট লাইব্রেরি যা প্রডিউসার এবং কনজিউমারের জন্য ব্যবহৃত হয়।kafka: Scala Kafka লাইব্রেরি।
২. Kafka প্রডিউসার তৈরি করা (Producer)
Kafka প্রডিউসার একটি অ্যাপ্লিকেশন যা ডাটা Kafka টপিকে পাঠায়। এখানে আমরা একটি প্রডিউসার তৈরি করব যা একটি টপিকে বার্তা প্রেরণ করবে।
Kafka প্রডিউসার কোড:
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import java.util.Properties
object KafkaProducerExample {
def main(args: Array[String]): Unit = {
// Kafka প্রডিউসার কনফিগারেশন
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") // Kafka সার্ভার
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// Kafka প্রডিউসার তৈরি করা
val producer = new KafkaProducer[String, String](props)
// Kafka টপিকের জন্য বার্তা পাঠানো
val record = new ProducerRecord[String, String]("test-topic", "key", "Hello, Kafka!")
producer.send(record) // বার্তা পাঠানো
println("Message sent successfully!")
// প্রডিউসার বন্ধ করা
producer.close()
}
}এখানে:
props.put: Kafka সার্ভারের ঠিকানা এবং সিরিয়ালাইজার সেট করা হয়েছে।ProducerRecord: Kafka টপিক এবং বার্তা তৈরি করা হয়েছে।producer.send(record): বার্তা Kafka টপিকে পাঠানো হয়েছে।
টপিক তৈরি:
Kafka টপিক তৈরি করতে আপনি Kafka সার্ভারের কমান্ড লাইন টুল ব্যবহার করতে পারেন:
kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1৩. Kafka কনজিউমার তৈরি করা (Consumer)
Kafka কনজিউমার একটি অ্যাপ্লিকেশন যা Kafka টপিক থেকে বার্তা গ্রহণ করে। নিচে একটি কনজিউমার তৈরি করা হয়েছে যা test-topic টপিক থেকে বার্তা নিয়ে আসে।
Kafka কনজিউমার কোড:
import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig}
import java.util.{Properties, ConsumerRecord}
import java.util.Collections
object KafkaConsumerExample {
def main(args: Array[String]): Unit = {
// Kafka কনজিউমার কনফিগারেশন
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") // Kafka সার্ভার
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group")
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
// Kafka কনজিউমার তৈরি করা
val consumer = new KafkaConsumer[String, String](props)
// টপিক সাবস্ক্রাইব করা
consumer.subscribe(Collections.singletonList("test-topic"))
// বার্তা পড়া
while (true) {
val records = consumer.poll(1000) // 1 সেকেন্ড পর্যন্ত অপেক্ষা করবে
for (record: ConsumerRecord[String, String] <- records.asScala) {
println(s"Consumed record: key = ${record.key}, value = ${record.value}, partition = ${record.partition}, offset = ${record.offset}")
}
}
}
}এখানে:
props.put: কনজিউমারের কনফিগারেশন সেট করা হয়েছে।consumer.subscribe: কনজিউমারকেtest-topicটপিক সাবস্ক্রাইব করানো হয়েছে।consumer.poll: বার্তা নিয়ে আসার জন্য ব্যবহৃত হয়।
কনজিউমার চলমান থাকলে:
- কনজিউমার প্রাপ্ত বার্তা স্ক্যান করবে এবং কনসোল এ প্রিন্ট করবে।
৪. ফিউচার এবং Kafka
স্কালার অ্যাসিনক্রোনাস প্রোগ্রামিংয়ের জন্য ফিউচার ব্যবহৃত হয়। Kafka প্রডিউসার এবং কনজিউমারকে অ্যাসিনক্রোনাসভাবে পরিচালনা করতে ফিউচার ব্যবহার করা যেতে পারে।
উদাহরণ: ফিউচার ব্যবহার করে Kafka প্রডিউসার
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
object KafkaProducerWithFuture {
def main(args: Array[String]): Unit = {
val props = new java.util.Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val future = Future {
val record = new ProducerRecord[String, String]("test-topic", "key", "Hello, Kafka with Future!")
producer.send(record)
println("Message sent successfully!")
}
// Wait for future completion
future.onComplete {
case scala.util.Success(_) => println("Message sent and future completed.")
case scala.util.Failure(e) => println(s"Failed: ${e.getMessage}")
}
// Producer close
producer.close()
}
}এখানে:
Futureব্যবহৃত হয়েছে যাতে প্রডিউসারের কাজ অ্যাসিনক্রোনাসভাবে করা যায় এবং ভবিষ্যতে তার ফলাফল পাওয়া যায়।
৫. Kafka Streams এবং Akka Streams
Play Framework এবং Akka Streams এর মাধ্যমে Kafka Streams ব্যবহৃত হতে পারে, যা স্কালায় ডাটা স্ট্রিমিং প্রক্রিয়াকরণের জন্য আরো সুবিধাজনক। এটি উচ্চ পারফরম্যান্স এবং স্ট্রিমিং ডাটা হ্যান্ডলিংয়ের জন্য ব্যবহৃত হয়।
সারাংশ
- Kafka প্রডিউসার: ডাটা Kafka টপিকে পাঠায়।
- Kafka কনজিউমার: Kafka টপিক থেকে ডাটা গ্রহণ করে।
- ফিউচার: Kafka প্রডিউসার বা কনজিউমারকে অ্যাসিনক্রোনাসভাবে পরিচালনা করার জন্য ফিউচার ব্যবহৃত হয়।
- Kafka Streams: এটি একটি উন্নত স্ট্রিমিং লাইব্রেরি, যা Kafka ডাটা প্রক্রিয়াকরণের জন্য ব্যবহৃত হয়।
Kafka স্কালায় ব্যবহার করতে হলে, Kafka ক্লায়েন্ট লাইব্রেরি এবং প্রপার কনফিগারেশন ব্যবহার করে ডাটা প্রেরণ এবং গ্রহণ করা যেতে পারে।
Akka হল একটি শক্তিশালী ফ্রেমওয়ার্ক যা Actor Model ব্যবহার করে কনকারেন্ট, ডিসট্রিবিউটেড, এবং রেসিলিয়েন্ট অ্যাপ্লিকেশন তৈরি করতে ব্যবহৃত হয়। Akka Cluster এবং Akka Remote দুটি ফিচার প্রদান করে, যা ডিস্ট্রিবিউটেড অ্যাপ্লিকেশন তৈরির জন্য অত্যন্ত উপযোগী। এই দুটি ফিচার ব্যবহার করে আপনি আপনার অ্যাক্টরদের একাধিক নোডে বা মেশিনে স্থাপন করতে পারেন এবং তারা একে অপরের সাথে যোগাযোগ করতে পারে।
১. Akka Cluster
Akka Cluster একটি ডিসট্রিবিউটেড সিস্টেম তৈরি করতে সাহায্য করে, যেখানে একাধিক নোড বা মেশিনের মধ্যে অ্যাক্টররা একে অপরের সাথে যোগাযোগ করতে পারে। এটি failover এবং scalability সুবিধা প্রদান করে। Akka Cluster আপনাকে ডিস্ট্রিবিউটেড অ্যাপ্লিকেশন তৈরির জন্য নিম্নলিখিত সুবিধা প্রদান করে:
- Clustering: একাধিক নোডের মধ্যে অ্যাক্টরগুলিকে যুক্ত করা।
- Failover: এক নোড ব্যর্থ হলে অন্য নোড থেকে অ্যাক্টর পুনরুদ্ধার করা।
- Eventual Consistency: নোডগুলির মধ্যে ডেটা সমন্বয় করা।
- Shard: একাধিক নোডের মধ্যে অ্যাক্টরদের শার্ড করা (Shard-based routing)।
১.১ Akka Cluster কনফিগারেশন
Akka Cluster ব্যবহার করতে হলে আপনাকে application.conf ফাইলে কিছু কনফিগারেশন করতে হবে। এখানে একটি সাধারণ Akka Cluster কনফিগারেশন দেখানো হলো:
akka {
actor {
provider = "cluster" // Enable Akka Cluster
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551",
"akka.tcp://ClusterSystem@127.0.0.1:2552"
]
auto-down-unreachable-after = 10s
}
}এখানে:
hostnameএবংportসেট করা হয়েছে, যা নোডের ঠিকানা নির্ধারণ করে।seed-nodesহল সিড নোডগুলির তালিকা, যেগুলি ক্লাস্টারের অন্যান্য নোডগুলোকে জানায়।auto-down-unreachable-afterসেকেন্ডে সেট করা হয়েছে, যার মাধ্যমে একটি নোড যদি দীর্ঘ সময়ের জন্য পৌঁছাতে না পারে, তবে তা ডাউন হিসেবে চিহ্নিত হবে।
১.২ Akka Cluster কোড উদাহরণ
এখন আমরা একটি সিম্পল Akka Cluster উদাহরণ তৈরি করি যেখানে দুইটি নোড অ্যাক্টরদের মধ্যে যোগাযোগ করবে।
ClusterApp.scala:
import akka.actor.{Actor, ActorSystem, Props}
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.{MemberUp, MemberEvent}
class ClusterListener extends Actor {
val cluster = Cluster(context.system)
// Subscribe to cluster changes
cluster.subscribe(self, classOf[MemberEvent])
def receive = {
case MemberUp(member) =>
println(s"Member is Up: ${member.address}")
case _ => // Ignore other messages
}
}
object ClusterApp extends App {
val system = ActorSystem("ClusterSystem")
val listener = system.actorOf(Props[ClusterListener], name = "clusterListener")
// Join the cluster
val cluster = Cluster(system)
cluster.join(cluster.selfAddress)
// Keep the application running
println("Cluster system is running.")
}এখানে:
- ClusterListener অ্যাক্টরটি ক্লাস্টারের সদস্যদের পরিবর্তন দেখবে এবং সদস্যদের
MemberUpস্টেটাস মেসেজ গ্রহণ করবে। cluster.join(cluster.selfAddress)এর মাধ্যমে নোডটি ক্লাস্টারে যোগদান করবে।
২. Akka Remote
Akka Remote আপনাকে আপনার অ্যাক্টরদের একাধিক মেশিনে বা নোডে চালানোর ক্ষমতা দেয় এবং তারা একে অপরের সাথে যোগাযোগ করতে পারে। Akka Remote ব্যবহৃত হয় যখন আপনি একাধিক অ্যাক্টর সিস্টেমকে আলাদা মেশিনে বা প্রক্রিয়ায় চালাতে চান।
২.১ Akka Remote কনফিগারেশন
Akka Remote ব্যবহার করার জন্য আপনাকে application.conf ফাইলে কনফিগারেশন করতে হবে।
akka {
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
}এখানে, hostname এবং port সেট করা হয়েছে, যা নির্দিষ্ট করে দেয় যে এই অ্যাক্টর সিস্টেম কোন মেশিনে চলবে এবং কোন পোর্টে এটি অ্যাক্সেস করা যাবে।
২.২ Akka Remote উদাহরণ
এখন একটি উদাহরণ দেখানো হলো যেখানে Akka Remote ব্যবহার করা হচ্ছে দুটি আলাদা সিস্টেমের মধ্যে যোগাযোগের জন্য।
RemoteSender.scala (Sender Side):
import akka.actor.{Actor, ActorSystem, Props}
import akka.remote.RemoteScope
class RemoteActor extends Actor {
def receive = {
case msg: String => println(s"Received message: $msg")
}
}
object RemoteSender extends App {
val system = ActorSystem("RemoteSystem")
val remoteActor = system.actorOf(Props[RemoteActor], name = "remoteActor")
// Send a message to the remote actor
remoteActor ! "Hello from remote system"
}RemoteReceiver.scala (Receiver Side):
import akka.actor.{Actor, ActorSystem, Props}
class RemoteActor extends Actor {
def receive = {
case msg: String => println(s"Received message: $msg")
}
}
object RemoteReceiver extends App {
val system = ActorSystem("RemoteSystem")
val remoteActor = system.actorOf(Props[RemoteActor], name = "remoteActor")
// Wait for messages
}এখানে:
- RemoteSender অ্যাক্টরটি একটি মেসেজ "Hello from remote system" পাঠাচ্ছে।
- RemoteReceiver অ্যাক্টরটি রিসিভ করে মেসেজটি প্রিন্ট করবে।
৩. Akka Cluster এবং Remote-এর সুবিধা
- Scalability: Akka Cluster ব্যবহার করে আপনি সহজেই অ্যাপ্লিকেশন স্কেল করতে পারেন। একাধিক নোড যোগ করে আপনি আপনার সিস্টেমকে স্কেল করতে পারবেন।
- Fault Tolerance: Akka Cluster অটোমেটিকালি ব্যর্থ নোডকে চিনতে পারে এবং সিস্টেম পুনরুদ্ধার করতে পারে।
- High Availability: Akka Remote এবং Cluster ব্যবহার করে আপনার অ্যাপ্লিকেশন সর্বদা উপলব্ধ থাকবে, কারণ অ্যাক্টররা বিভিন্ন নোডের মধ্যে যোগাযোগ করতে সক্ষম।
- Distributable Actors: আপনি আপনার অ্যাক্টরগুলোকে বিভিন্ন সিস্টেমে বিতরণ করতে পারবেন, যা আপনাকে বড় সিস্টেম ডিজাইন করতে সাহায্য করবে।
সারাংশ
- Akka Cluster হল একটি ডিসট্রিবিউটেড সিস্টেম তৈরি করার জন্য ব্যবহৃত শক্তিশালী ফিচার যা একাধিক নোডের মধ্যে অ্যাক্টরদের মধ্যে যোগাযোগ করতে সক্ষম।
- Akka Remote আপনাকে আপনার অ্যাক্টর সিস্টেমকে বিভিন্ন সিস্টেম বা নোডে রান করতে সক্ষম করে এবং তাদের মধ্যে যোগাযোগ স্থাপন করতে সাহায্য করে।
- Akka এর মাধ্যমে আপনি কনকারেন্ট, ডিসট্রিবিউটেড, এবং স্কেলেবল সিস্টেম তৈরি করতে পারেন যা অত্যন্ত কার্যকর এবং উচ্চ পারফরম্যান্স প্রদান করে।
Akka Cluster এবং Remote ব্যবহারের মাধ্যমে আপনি একটি সিস্টেমের বিভিন্ন অংশে একাধিক অ্যাক্টর সিস্টেমকে সিঙ্ক্রোনাইজ করতে সক্ষম হবেন, যা বড় এবং স্কেলেবল অ্যাপ্লিকেশন তৈরি করতে সাহায্য করে।
Event Sourcing এবং CQRS (Command Query Responsibility Segregation) হল দুটি শক্তিশালী আর্কিটেকচারাল প্যাটার্ন, যা একসাথে ব্যবহৃত হলে স্কালার সিস্টেমকে আরও স্কেলেবল, পারফরম্যান্স অপটিমাইজড, এবং রিঅ্যাকটিভ করতে সাহায্য করে। এই দুটি প্যাটার্ন রিঅ্যাকটিভ ডিস্ট্রিবিউটেড সিস্টেম ডিজাইনে জনপ্রিয়, যেখানে ডেটার স্টেট পরিবর্তন এবং পড়া আলাদা করা হয় এবং ডেটার ইতিহাস সংরক্ষিত থাকে।
১. Event Sourcing
Event Sourcing হল এমন একটি আর্কিটেকচারাল প্যাটার্ন, যেখানে অ্যাপ্লিকেশন স্টেট পরিবর্তন সরাসরি ডাটাবেসে সংরক্ষণ না করে ইভেন্ট হিসেবে সংরক্ষণ করা হয়। প্রতিটি স্টেট পরিবর্তন একটি ইভেন্ট হিসেবে ধরে রাখা হয় এবং যখনই বর্তমান স্টেট প্রয়োজন হয়, তখন পুরানো ইভেন্টগুলিকে পুনরায় খেলা (replay) করা হয়। এর মাধ্যমে সিস্টেমের স্টেটের পূর্ণ ইতিহাস ট্র্যাক করা যায় এবং কোন সময়ে স্টেট হারিয়ে গেলেও, পুরনো ইভেন্টগুলির মাধ্যমে সঠিক স্টেট পুনরুদ্ধার করা সম্ভব হয়।
১.১ Event Sourcing এর মূল উপাদান:
- ইভেন্ট: এটি একটি কার্যকলাপ বা স্টেট পরিবর্তন যা সিস্টেমে ঘটেছে।
- ইভেন্ট স্টোর: সমস্ত ইভেন্ট সংরক্ষিত থাকে। এটি ঐতিহাসিক ডেটা যা পুনরায় খেলা যেতে পারে।
- স্টেট পুনঃপ্রতিষ্ঠা: ডেটার বর্তমান স্টেট ইভেন্টগুলির replay মাধ্যমে পুনরুদ্ধার করা হয়।
১.২ Event Sourcing উদাহরণ
sealed trait AccountEvent
case class Deposit(amount: Int) extends AccountEvent
case class Withdraw(amount: Int) extends AccountEvent
case class Account(id: String, balance: Int)
object EventSourcingExample {
def handleEvent(account: Account, event: AccountEvent): Account = event match {
case Deposit(amount) => account.copy(balance = account.balance + amount)
case Withdraw(amount) => account.copy(balance = account.balance - amount)
}
def main(args: Array[String]): Unit = {
var account = Account("123", 0)
val events: List[AccountEvent] = List(Deposit(100), Withdraw(50), Deposit(200))
events.foreach(event => account = handleEvent(account, event))
println(account) // Output: Account(123,250)
}
}এখানে:
DepositএবংWithdrawইভেন্টগুলো Account অবজেক্টের স্টেট পরিবর্তন করে।- ইভেন্টগুলো replay করে অ্যাকাউন্টের বর্তমান ব্যালেন্স পুনরুদ্ধার করা হয়েছে।
২. CQRS (Command Query Responsibility Segregation)
CQRS হল একটি আর্কিটেকচার প্যাটার্ন যেখানে সিস্টেমের Command এবং Query অপারেশনগুলোকে আলাদা করা হয়।
- Command: সিস্টেমের স্টেট পরিবর্তন করা, যেমন ডেটা আপডেট বা ইনসার্ট করা।
- Query: সিস্টেমের স্টেট পড়া, যেমন ডেটা ফেচ করা বা কুয়েরি চালানো।
CQRS প্যাটার্নে, Write Model এবং Read Model আলাদা রাখা হয়। Command অপারেশনগুলি স্টেট পরিবর্তন করে, এবং Query অপারেশনগুলি শুধুমাত্র ডেটা পড়ে, স্টেট পরিবর্তন না করে।
২.১ CQRS এর সুবিধা:
- পারফরম্যান্স অপটিমাইজেশন: কুয়েরি এবং কমান্ড অপারেশন আলাদা করার মাধ্যমে পারফরম্যান্স উন্নত করা যায়। কুয়েরি অপারেশনগুলিকে অপটিমাইজড এবং ফাস্ট করা যায়।
- স্কেলেবিলিটি: পৃথক কুয়েরি এবং কমান্ড মডেল স্কেলিং সহজ করে তোলে, যেমন ডেটাবেসে পৃথক ইনডেক্সিং বা ডেটাবেস শার্ডিং।
- সাধারণ সিস্টেমের কার্যকারিতা বৃদ্ধি: একে অপরের ওপর নির্ভরশীল না হওয়ার কারণে সিস্টেমের লোড সঠিকভাবে পরিচালনা করা যায়।
২.২ CQRS উদাহরণ
// Command Side - Write Model
case class Command(id: String, amount: Int)
class CommandHandler {
def handleCommand(command: Command): String = {
// Process command and perform write operation
s"Processed command with id: ${command.id} and amount: ${command.amount}"
}
}
// Query Side - Read Model
case class Query(id: String)
class QueryHandler {
def handleQuery(query: Query): String = {
// Process query and fetch data
s"Fetched data for query with id: ${query.id}"
}
}
object CQRSExample {
def main(args: Array[String]): Unit = {
val commandHandler = new CommandHandler
val queryHandler = new QueryHandler
// Handling command
val commandResult = commandHandler.handleCommand(Command("123", 100))
println(commandResult) // Output: Processed command with id: 123 and amount: 100
// Handling query
val queryResult = queryHandler.handleQuery(Query("123"))
println(queryResult) // Output: Fetched data for query with id: 123
}
}এখানে:
- CommandHandler সিস্টেমের স্টেট পরিবর্তন করে, যেমন ডেটা ইনসার্ট বা আপডেট।
- QueryHandler শুধুমাত্র ডেটা পড়ার জন্য ব্যবহৃত হয়, ডেটা পরিবর্তন নয়।
৩. Event Sourcing এবং CQRS একসাথে ব্যবহৃত হলে
এখন যদি Event Sourcing এবং CQRS একসাথে ব্যবহৃত হয়, তবে আপনি সিস্টেমের স্টেট পরিবর্তন ইভেন্ট হিসেবে সংরক্ষণ করবেন এবং সেই ইভেন্টগুলো replay করে স্টেট পুনরুদ্ধার করবেন। সিস্টেমের Command এবং Query অপারেশন আলাদা করা হবে, যা পারফরম্যান্স এবং স্কেলেবিলিটি আরও উন্নত করবে।
৩.১ Event Sourcing এবং CQRS একসাথে উদাহরণ
// Event Sourcing with CQRS
case class OrderCreated(id: String, item: String, quantity: Int)
class CommandHandler {
def handleCreateOrder(order: OrderCreated): Unit = {
// Save the event to event store
println(s"Saving event: OrderCreated(${order.id}, ${order.item}, ${order.quantity})")
}
}
class QueryHandler {
def getOrder(id: String): String = {
// Fetch order details from read model
s"Fetching order details for id: $id"
}
}
object EventSourcingCQRSExample {
def main(args: Array[String]): Unit = {
val commandHandler = new CommandHandler
val queryHandler = new QueryHandler
// Command: Create an order
val order = OrderCreated("123", "Laptop", 1)
commandHandler.handleCreateOrder(order)
// Query: Fetch the order
val orderDetails = queryHandler.getOrder("123")
println(orderDetails) // Output: Fetching order details for id: 123
}
}এখানে:
- CommandHandler ইভেন্টটি ডাটাবেসে সংরক্ষণ করে (এটি বাস্তবে ইভেন্ট স্টোর হতে পারে)।
- QueryHandler ডেটা থেকে বিস্তারিত প্রাপ্তি করে, যা আলাদা read model থেকে এসেছে।
সারাংশ
- Event Sourcing হল একটি প্যাটার্ন যেখানে সমস্ত স্টেট পরিবর্তন ইভেন্ট হিসেবে সংরক্ষণ করা হয়, এবং এই ইভেন্টগুলি replay করে স্টেট পুনরুদ্ধার করা হয়।
- CQRS হল একটি প্যাটার্ন যেখানে Command (স্টেট পরিবর্তন) এবং Query (স্টেট পড়া) আলাদা করে দেওয়া হয়, যা পারফরম্যান্স এবং স্কেলেবিলিটি উন্নত করে।
- Event Sourcing এবং CQRS একসাথে ব্যবহৃত হলে, আপনি একটি সিস্টেম ডিজাইন করতে পারেন যা দ্রুত এবং স্কেলেবল, এবং এর ডেটার ইতিহাস সংরক্ষণ এবং পুনরুদ্ধার করা সম্ভব।
স্কালা মেসেজ পাসিং সিস্টেম সাধারণত Akka লাইব্রেরি বা স্কালার অ্যাক্টর মডেল (Actor Model) ব্যবহার করে তৈরি করা হয়। মেসেজ পাসিং সিস্টেম হল একটি ডিস্ট্রিবিউটেড বা কনকারেন্ট সিস্টেম যেখানে বিভিন্ন অ্যাক্টর বা কম্পোনেন্ট একে অপরের মধ্যে মেসেজ পাস করে, এবং প্রতিটি অ্যাক্টর তাদের নিজস্ব স্টেট এবং কার্যক্ষমতা বজায় রেখে কাজ করে। এটি অ্যাক্টর মডেল-এর মূল ধারণা, যেখানে মেসেজ পাসিংয়ের মাধ্যমে অ্যাক্টররা যোগাযোগ করে এবং একে অপরের অবস্থান বা স্টেটকে পরিবর্তন না করে একে অপরের সাথে সমন্বয় করে কাজ করে।
স্কালায় Akka লাইব্রেরি এবং Akka Actor Model ব্যবহারের মাধ্যমে মেসেজ পাসিং সিস্টেম তৈরি করা সহজ এবং কার্যকরী হয়।
এই লেখায় আমরা Akka Actor Model এর মাধ্যমে স্কালায় মেসেজ পাসিং সিস্টেমের কিভাবে কাজ করে তা উদাহরণসহ জানবো।
১. Akka Actor Model
Akka Actor Model ব্যবহার করে স্কালায় মেসেজ পাসিং সিস্টেম তৈরি করা হয়, যেখানে অ্যাক্টর একটি ইনস্ট্যান্স হিসেবে কাজ করে এবং এটি মেসেজ গ্রহণ করে তার স্টেট পরিবর্তন করতে পারে। একটি অ্যাক্টর শুধুমাত্র নিজেই মেসেজ হ্যান্ডল করতে সক্ষম এবং অন্যান্য অ্যাক্টরের স্টেট পরিবর্তন করতে পারে না।
মূল ধারণা:
- অ্যাক্টর: একটি অ্যাক্টর হল একটি স্বতন্ত্র ইউনিট যা ইনপুট (মেসেজ) গ্রহণ করে এবং তার স্টেট পরিবর্তন করে। অ্যাক্টরের জন্য মূল কার্যকলাপ হল মেসেজ হ্যান্ডলিং।
- মেসেজ পাসিং: অ্যাক্টররা একে অপরকে মেসেজ পাস করে এবং পরবর্তীতে তাদের কাজ চালিয়ে যায়। মেসেজ একবার পাস করার পর, একে অন্য অ্যাক্টরের স্টেটের উপর কোনো প্রভাব পড়ে না, বরং প্রক্রিয়া একে অপরের মধ্যে সম্পূর্ণ বিচ্ছিন্নভাবে ঘটে।
২. Akka Actor Setup এবং ব্যবহার
এখানে একটি সাধারণ Akka অ্যাক্টর সেটআপ এবং মেসেজ পাসিং সিস্টেমের উদাহরণ দেয়া হলো।
২.১ Akka Actor Dependency যোগ করা
প্রথমে, আপনার build.sbt ফাইলে Akka Actor লাইব্রেরি যোগ করুন:
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.6.10"
libraryDependencies += "com.typesafe.akka" %% "akka-testkit" % "2.6.10" % Test২.২ Akka Actor উদাহরণ
এই উদাহরণে, আমরা একটি Greeter অ্যাক্টর তৈরি করব যা একটি গ্রীটিং মেসেজ গ্রহণ করবে এবং তারপরে এটি প্রতিক্রিয়া হিসেবে মেসেজ পাঠাবে।
import akka.actor.{Actor, ActorSystem, Props}
// Greeter actor
class Greeter extends Actor {
def receive = {
case "greet" =>
println("Hello, Actor Model!")
sender() ! "greeting sent" // Responding with a message
case _ =>
println("Unknown message")
}
}
object AkkaActorExample {
def main(args: Array[String]): Unit = {
val system = ActorSystem("actor-system")
// Create an instance of Greeter actor
val greeter = system.actorOf(Props[Greeter], "greeter-actor")
// Send a "greet" message to the Greeter actor
greeter ! "greet"
// Send a message that the actor does not understand
greeter ! "unknown"
// Stop the actor system gracefully
system.terminate()
}
}এখানে:
Greeterক্লাস হল একটি actor যা একটিreceiveমেথড দিয়ে মেসেজ গ্রহণ করে এবং উপযুক্ত প্রতিক্রিয়া প্রদান করে।actorOfমেথড ব্যবহার করে অ্যাক্টর তৈরি করা হয়েছে।greeter ! "greet": এই লাইনটি মেসেজ"greet"অ্যাক্টরের কাছে পাঠাচ্ছে।
২.৩ মেসেজ পাসিং
Akka-তে মেসেজ পাসিং খুবই সরল। আপনি শুধুমাত্র অ্যাক্টরের নাম এবং মেসেজ পাঠালেই, অ্যাক্টর তা গ্রহণ করে প্রক্রিয়া করবে। এটি এস্কেপাল টাইম এর মধ্যে হয়, এবং কখনও একাধিক অ্যাক্টর একে অপরকে মেসেজ পাঠায়।
এখানে:
sender(): এটি অ্যাক্টরের মাধ্যমে বর্তমান sender-এর রেফারেন্স গ্রহণ করে, যাতে প্রতিক্রিয়া পাঠানো যায়।!(অ্যাক্টর চিহ্ন) দিয়ে মেসেজ পাঠানো হয়।
৩. Akka মেসেজ পাসিং অ্যাপ্লিকেশন উদাহরণ
ধরা যাক, আমাদের দুটি অ্যাক্টর রয়েছে:
- SenderActor: এটি মেসেজ পাঠানোর দায়িত্ব পালন করবে।
- ReceiverActor: এটি মেসেজ গ্রহণ করবে এবং প্রক্রিয়া করবে।
import akka.actor.{Actor, ActorSystem, Props}
// Sender Actor
class SenderActor(receiver: akka.actor.ActorRef) extends Actor {
def receive = {
case "start" =>
println("Sender sending message to Receiver")
receiver ! "Hello, Receiver!"
}
}
// Receiver Actor
class ReceiverActor extends Actor {
def receive = {
case msg: String =>
println(s"Receiver received message: $msg")
}
}
object AkkaMessagePassing {
def main(args: Array[String]): Unit = {
val system = ActorSystem("Message-Passing-System")
// Create Receiver actor
val receiver = system.actorOf(Props[ReceiverActor], "receiver")
// Create Sender actor
val sender = system.actorOf(Props(new SenderActor(receiver)), "sender")
// Start message passing
sender ! "start"
// Gracefully terminate the system
system.terminate()
}
}এখানে:
SenderActorমেসেজ পাঠায় এবংReceiverActorমেসেজ গ্রহণ করে এবং প্রিন্ট করে।receiver ! "Hello, Receiver!": Sender actor থেকে Receiver actor-এ মেসেজ পাঠানো হচ্ছে।
Output:
Sender sending message to Receiver
Receiver received message: Hello, Receiver!৪. Akka মেসেজ পাসিং এবং আন্ডারলাইন
- অ্যাক্টর মডেল স্কালায় মেসেজ পাসিং ব্যবস্থার মাধ্যমে কনকারেন্ট প্রোগ্রামিং খুব সহজ করে তোলে।
- একাধিক অ্যাক্টর একে অপরকে মেসেজ পাঠানোর মাধ্যমে ডিস্ট্রিবিউটেড সিস্টেম তৈরি করা যায় এবং অ্যাক্টরের মধ্যে মেসেজ সিঙ্ক্রোনাস এবং অ্যাসিঙ্ক্রোনাসভাবে পাস করা যায়।
৫. মেসেজ পাসিং সিস্টেমের সুবিধা
- ডিস্ট্রিবিউটেড এবং কনকারেন্ট প্রোগ্রামিং: মেসেজ পাসিং সিস্টেমের মাধ্যমে একাধিক অ্যাক্টর একে অপরের সাথে পারফেক্টলি কমিউনিকেশন করতে পারে, যা ডিস্ট্রিবিউটেড সিস্টেম এবং কনকারেন্ট প্রোগ্রামিংয়ে সহায়তা করে।
- ডিকাপলিং: অ্যাক্টররা একে অপরের স্টেটের সাথে সরাসরি যোগাযোগ না করে মেসেজ পাস করে, ফলে কোডের গঠন সহজ হয় এবং কমপ্লেক্সিটি কমে।
- স্কেলেবিলিটি: মেসেজ পাসিং সিস্টেমে নতুন অ্যাক্টর যোগ করা এবং স্কেল করা সহজ, যা ডিস্ট্রিবিউটেড সিস্টেমের জন্য উপযোগী।
সারাংশ
Akka Actor Model এবং Akka HTTP স্কালায় মেসেজ পাসিং সিস্টেমের জন্য অত্যন্ত শক্তিশালী ফ্রেমওয়ার্ক, যা ডিস্ট্রিবিউটেড সিস্টেম এবং কনকারেন্ট প্রোগ্রামিং তৈরিতে সহায়ক। Akka অ্যাক্টর মডেল ব্যবহার করে মেসেজ পাসিং সিস্টেম তৈরি করে কোডকে আরও নমনীয়, ডিকাপলড, এবং স্কেলেবল করা যায়।
Read more