স্ট্রিম প্রসেসিং হল একটি প্রক্রিয়া যার মাধ্যমে ডেটা একটানা, ধারাবাহিকভাবে এবং লেটেন্সি কম রেখে প্রসেস করা হয়। স্কালায় স্ট্রিম প্রসেসিং সাধারণত ফাংশনাল প্রোগ্রামিং কনসেপ্টে ভরপুর থাকে এবং এটি খুবই শক্তিশালী, বিশেষ করে যখন ডেটার পরিমাণ বড় এবং দ্রুত পরিবর্তিত হয়। স্কালায় স্ট্রিম প্রসেসিং করার জন্য Akka Streams, Scala's Standard Library এবং Apache Spark-এর মতো টুলস ব্যবহার করা হয়।
স্ট্রিম প্রসেসিং প্রধানত রিয়েল-টাইম ডেটা প্রসেসিংয়ের জন্য ব্যবহৃত হয়, যেমন লোকেশন ডেটা, সেন্সর ডেটা, বা সামাজিক মিডিয়া ফিডের ডেটা।
১. Scala’s Standard Library: Streams
স্কালার স্ট্যান্ডার্ড লাইব্রেরিতে Stream নামে একটি কনসেপ্ট রয়েছে, যা lazy (আলস) স্ট্রিম ডেটা তৈরি করে। এটি একটি immutable ডেটা স্ট্রাকচার, যার মানে একবার তৈরি হলে এর উপাদান পরিবর্তন করা যায় না।
১.১ Stream উদাহরণ
object StreamExample {
def main(args: Array[String]): Unit = {
// Creating a lazy stream
val stream = Stream.from(1) // Stream starting from 1, lazy evaluation
// Take the first 5 elements of the stream
val firstFive = stream.take(5).toList
println(firstFive) // Output: List(1, 2, 3, 4, 5)
}
}এখানে:
Stream.from(1)একটি স্ট্রিম তৈরি করেছে যা ১ থেকে শুরু হয়ে অনন্তসংখ্যক মান তৈরি করবে। তবে, এটি lazy বা হালকা হিসেবে কাজ করে, অর্থাৎ প্রয়োজন না হলে ডেটা উৎপন্ন হয় না।take(5)প্রথম ৫টি মান নেয় এবং.toListদ্বারা একটি লিস্টে রূপান্তরিত হয়।
১.২ ফিল্টার এবং ম্যাপ অপারেশন
স্ট্রিমে আপনি বিভিন্ন ফাংশনাল অপারেশন যেমন map, filter, flatMap ইত্যাদি প্রয়োগ করতে পারেন।
object StreamOperationsExample {
def main(args: Array[String]): Unit = {
val stream = Stream.from(1)
// Filter to get even numbers and take the first 5 even numbers
val evenNumbers = stream.filter(_ % 2 == 0).take(5).toList
println(evenNumbers) // Output: List(2, 4, 6, 8, 10)
}
}এখানে:
filterস্ট্রিমের সব ইভেন নম্বর বের করে, তারপর.take(5)প্রথম ৫টি ইভেন নম্বর নেয়।
২. Akka Streams (Real-Time Stream Processing)
Akka Streams স্কালার জন্য একটি শক্তিশালী স্ট্রিম প্রসেসিং লাইব্রেরি যা পারফরম্যান্স ও স্কেলেবিলিটির দিক থেকে অনেক উন্নত। এটি Reactive Streams স্পেসিফিকেশন অনুসরণ করে এবং backpressure ম্যানেজমেন্টসহ স্ট্রিম প্রসেসিং সমাধান প্রদান করে।
২.১ Akka Streams উদাহরণ
Akka Streams ব্যবহার করতে হলে, আপনাকে প্রথমে Akka Streams লাইব্রেরি অ্যাড করতে হবে (যেমন build.sbt ফাইলে):
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.6.10"এখানে একটি সিম্পল Akka Streams উদাহরণ:
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Source}
object AkkaStreamsExample {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("AkkaStreamsExample")
implicit val materializer = ActorMaterializer()
// A simple Source emitting integers
val source = Source(1 to 10)
// Processing the source and printing the elements
source.runForeach(println) // Prints numbers from 1 to 10
// Shutdown the actor system
system.terminate()
}
}এখানে:
Source(1 to 10): এটি একটি সোর্স তৈরি করেছে যা ১ থেকে ১০ পর্যন্ত সংখ্যাগুলি emit করে।runForeach: স্ট্রিমের উপাদানগুলিকে প্রসেস করে (এখানে, প্রতিটি উপাদানকে প্রিন্ট করছে)।- ActorMaterializer: Akka Streams এর কাজ করার জন্য প্রয়োজনীয় কম্পোনেন্ট, যা স্ট্রিমগুলিকে বাস্তবায়ন (materialize) করতে ব্যবহৃত হয়।
২.২ Akka Streams-এর সাথে Backpressure
Akka Streams এর একটি গুরুত্বপূর্ণ সুবিধা হল backpressure management। যদি ডেটা প্রসেস করার গতি অত্যন্ত দ্রুত হয় এবং প্রসেসর আস্তে আস্তে ডেটা গ্রহণ করতে পারে, তখন backpressure স্বয়ংক্রিয়ভাবে সামঞ্জস্য করবে।
import akka.stream.scaladsl.{Source, Sink}
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
object AkkaStreamsBackpressureExample {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("AkkaStreamsBackpressureExample")
implicit val materializer = ActorMaterializer()
val source = Source(1 to 10000)
// Use a sink that simulates slow processing
val sink = Sink.foreach[Int] { x =>
Thread.sleep(10) // Simulate slow processing
println(x)
}
// Running the source through the sink
source.to(sink).run()
system.terminate()
}
}এখানে:
Sink.foreachএকটি ফাংশন ব্যবহার করে স্ট্রিমের প্রতিটি উপাদানকে প্রসেস করছে, তবেThread.sleep(10)দিয়ে প্রসেসিং ধীর করা হয়েছে। এই কারণে Akka Streams স্বয়ংক্রিয়ভাবে backpressure প্রয়োগ করবে এবং সোর্স ধীর গতিতে ডেটা পাঠাবে।
৩. Apache Spark Streams
Apache Spark স্ট্রিম প্রসেসিংয়ের জন্য একটি অত্যন্ত জনপ্রিয় ফ্রেমওয়ার্ক। Spark স্ট্রিমিংকে ব্যবহার করে আপনি রিয়েল-টাইম ডেটা স্ট্রিমগুলি প্রসেস করতে পারবেন। Spark স্ট্রিমিংকে সাধারণত DStream (Discretized Stream) হিসেবে পরিচিত করা হয়।
৩.১ Apache Spark Streaming উদাহরণ
Apache Spark স্ট্রিমিং সেটআপের জন্য আপনাকে Spark Streaming লাইব্রেরি ইন্সটল করতে হবে।
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.0.1"এখানে একটি সহজ উদাহরণ দেওয়া হলো:
import org.apache.spark._
import org.apache.spark.streaming._
object SparkStreamingExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("Spark Streaming Example")
val ssc = new StreamingContext(conf, Seconds(1)) // 1 second batch interval
val stream = ssc.socketTextStream("localhost", 9999) // Reading stream from socket
// Processing the stream
val words = stream.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
wordCounts.print() // Print the word counts
ssc.start()
ssc.awaitTermination()
}
}এখানে:
ssc.socketTextStream: এটি একটি স্ট্রিম তৈরি করে যা একটি নির্দিষ্ট পোর্ট (এখানে9999) থেকে ডেটা নেয়।flatMap: স্ট্রিমের প্রত্যেকটি টেক্সট ডেটাকে শব্দে ভাগ করে।reduceByKey: শব্দগুলির সংখ্যা গণনা করতে ব্যবহৃত হচ্ছে।
সারাংশ
স্কালায় স্ট্রিম প্রসেসিং Akka Streams, Scala's Standard Library (Stream), এবং Apache Spark-এর মতো বিভিন্ন টুল ব্যবহার করে করা যায়। এগুলি রিয়েল-টাইম ডেটা প্রসেসিংয়ের জন্য উপযুক্ত এবং ফাংশনাল প্রোগ্রামিং কনসেপ্টের উপর ভিত্তি করে শক্তিশালী স্ট্রিম প্রসেসিং সমাধান প্রদান করে। Akka Streams এবং Spark স্ট্রিমিং দুটি ব্যাপকভাবে ব্যবহৃত টুল, যা বড় ডেটা বা রিয়েল-টাইম ডেটা প্রসেসিংয়ে সহায়ক।
Read more