Spark SQL-এ Nested Transformations হল এমন ট্রান্সফর্মেশন যা একাধিক স্টেপে ডেটা প্রক্রিয়া করে, যেখানে একটি ট্রান্সফর্মেশন অন্য ট্রান্সফর্মেশন বা অপারেশনের মধ্যে থাকে। Spark SQL এবং DataFrame/Dataset API-তে Nested Transformations ব্যবহার করে ডেটাকে আরও জটিলভাবে প্রক্রিয়া করা সম্ভব। Nested Transformations অত্যন্ত শক্তিশালী টুল যা ডেটাকে filter, map, aggregate ইত্যাদি ধাপে ধাপে প্রক্রিয়া করতে সাহায্য করে।
এই টিউটোরিয়ালে আমরা Nested Transformations এর বিভিন্ন উদাহরণ এবং তাদের পারফরম্যান্স ইমপ্যাক্ট সম্পর্কে আলোচনা করব।
১. Nested Transformations এর ব্যবহার
Nested Transformations সাধারণত একটি স্টেপে একটি ট্রান্সফর্মেশন অ্যাপ্লাই করার পর তার উপর পরবর্তী ট্রান্সফর্মেশন অ্যাপ্লাই করে। এই ধরনের ট্রান্সফর্মেশন ডেটা শেপিং এবং ক্লিনিং-এ অত্যন্ত কার্যকরী হতে পারে।
উদাহরণ ১: DataFrame-এ Nested map এবং filter ব্যবহার
# SparkSession তৈরি
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Nested Transformations Example").getOrCreate()
# DataFrame তৈরি
data = [("Alice", 28), ("Bob", 25), ("Charlie", 30), ("David", 35)]
columns = ["name", "age"]
df = spark.createDataFrame(data, columns)
# Nested Transformation: filter followed by map
df_transformed = df.filter(df['age'] > 25).rdd.map(lambda x: (x[0], x[1] + 1))
df_transformed.collect()
এখানে, প্রথমে filter ট্রান্সফর্মেশন ব্যবহার করে বয়স ২৫ এর বেশি এমন রেকর্ডগুলোকে ফিল্টার করা হয়েছে, এবং পরে map ট্রান্সফর্মেশন ব্যবহার করে তাদের বয়সে ১ যোগ করা হয়েছে। এই দুটি ট্রান্সফর্মেশন nested হয়ে একসাথে কাজ করেছে।
আউটপুট:
[('Alice', 29), ('Charlie', 31), ('David', 36)]
এখানে, filter এবং map একত্রে ব্যবহার করা হয়েছে, যাতে প্রথমে ফিল্টার করা হয় এবং তারপর সেটির উপর একটি নতুন মান (age + 1) অ্যাসাইন করা হয়।
২. Nested Aggregations
Nested Aggregations হল এমন একটি কৌশল যেখানে একাধিক গ্রুপিং বা অ্যাগ্রিগেট অপারেশন একে অপরের ভিতরে চলে। এটি বড় ডেটাসেটের উপর জটিল গাণিতিক পরিসংখ্যান বের করার জন্য ব্যবহৃত হয়, যেমন গড়, সর্বোচ্চ, সর্বনিম্ন, গুণফল ইত্যাদি।
উদাহরণ ২: Nested groupBy এবং agg ব্যবহার
# DataFrame তৈরি
data = [("Alice", "HR", 1000), ("Bob", "HR", 1500), ("Charlie", "Finance", 2000),
("David", "Finance", 2500), ("Eva", "HR", 1200)]
columns = ["name", "department", "salary"]
df = spark.createDataFrame(data, columns)
# Nested aggregation: প্রথমে department অনুযায়ী গ্রুপিং তারপর salary এর গড় বের করা
result = df.groupBy("department").agg(
{"salary": "avg"}).alias("avg_salary")
result.show()
এখানে, প্রথমে groupBy("department") ব্যবহার করে ডেটাকে বিভাগ অনুযায়ী গ্রুপ করা হয়েছে এবং তারপর সেই গ্রুপের salary কলামের গড় (average) বের করার জন্য agg ফাংশন ব্যবহার করা হয়েছে।
আউটপুট:
+---------+-----------+
|department|avg(salary)|
+---------+-----------+
|HR |1233.33 |
|Finance |2250.00 |
+---------+-----------+
এখানে, groupBy এবং agg ব্যবহার করে দুটি ট্রান্সফর্মেশন nested হয়ে কাজ করেছে।
৩. Nested withColumn and select Transformations
withColumn এবং select হলো Spark SQL-এর অন্যতম গুরুত্বপূর্ণ ট্রান্সফর্মেশন। এই দুটি ট্রান্সফর্মেশন একে অপরের মধ্যে nested ভাবে ব্যবহার করা যেতে পারে। উদাহরণস্বরূপ, আপনি withColumn ব্যবহার করে নতুন কলাম তৈরি করতে পারেন এবং তারপরে select ব্যবহার করে নির্দিষ্ট কলাম নির্বাচন করতে পারেন।
উদাহরণ ৩: Nested withColumn এবং select ব্যবহার
# DataFrame তৈরি
data = [("Alice", 28), ("Bob", 25), ("Charlie", 30), ("David", 35)]
columns = ["name", "age"]
df = spark.createDataFrame(data, columns)
# Nested Transformation: withColumn followed by select
df_transformed = df.withColumn("age_plus_ten", df['age'] + 10).select("name", "age_plus_ten")
df_transformed.show()
এখানে, withColumn ব্যবহার করে একটি নতুন কলাম age_plus_ten তৈরি করা হয়েছে, যা age কলামের উপর ১০ যোগ করে। তারপর select ব্যবহার করে নির্দিষ্ট কলাম নির্বাচন করা হয়েছে।
আউটপুট:
+-------+-----------+
| name|age_plus_ten|
+-------+-----------+
| Alice| 38|
| Bob| 35|
|Charlie| 40|
| David| 45|
+-------+-----------+
এখানে, withColumn এবং select দুটি ট্রান্সফর্মেশন nested হয়ে কাজ করেছে, প্রথমে একটি নতুন কলাম তৈরি করা হয়েছে এবং পরে নির্দিষ্ট কলামগুলো নির্বাচন করা হয়েছে।
৪. Chaining Multiple Transformations
Spark SQL-এ multiple transformations chaining করা খুবই সাধারণ, যেখানে একাধিক ট্রান্সফর্মেশন একের পর এক ডেটার উপর প্রয়োগ করা হয়। উদাহরণস্বরূপ, প্রথমে ফিল্টারিং, তারপর গ্রুপিং, তারপর অগ্রগতি।
উদাহরণ ৪: Chaining filter, groupBy, এবং agg Transformations
# DataFrame তৈরি
data = [("Alice", "HR", 1000), ("Bob", "HR", 1500), ("Charlie", "Finance", 2000),
("David", "Finance", 2500), ("Eva", "HR", 1200)]
columns = ["name", "department", "salary"]
df = spark.createDataFrame(data, columns)
# Chained transformations: filter -> groupBy -> agg
result = df.filter(df["salary"] > 1500).groupBy("department").agg({"salary": "avg"}).alias("avg_salary")
result.show()
এখানে, একসাথে filter, groupBy, এবং agg ট্রান্সফর্মেশনগুলি chaining করা হয়েছে। প্রথমে বেতন ১৫০০ এর বেশি এমন ব্যক্তিদের ফিল্টার করা হয়েছে, তারপর তাদের ডিপার্টমেন্ট অনুযায়ী গ্রুপিং করা হয়েছে এবং শেষে salary এর গড় বের করা হয়েছে।
আউটপুট:
+---------+-----------+
|department|avg(salary)|
+---------+-----------+
|Finance |2250.00 |
+---------+-----------+
এখানে, একাধিক ট্রান্সফর্মেশন chaining করা হয়েছে যা ডেটাকে প্রসেস করার জন্য একটি সংযুক্ত পদ্ধতি প্রদান করেছে।
সারাংশ
Nested Transformations Spark SQL-এ একাধিক ট্রান্সফর্মেশন একসাথে চালানোর একটি শক্তিশালী উপায়। map, flatMap, filter, groupBy, withColumn, select, এবং agg এর মতো ট্রান্সফর্মেশনগুলি একে অপরের মধ্যে nestedভাবে প্রয়োগ করা যায়। এসব Nested Transformations ব্যবহার করে ডেটাকে জটিলভাবে প্রক্রিয়া করা সম্ভব এবং এতে ডেটার পরিষ্কারকরণ, অ্যানালাইসিস এবং অন্যান্য অপারেশনগুলো দ্রুত করা যায়।
Read more