Spark SQL-এ DataFrames হল একটি গুরুত্বপূর্ণ ডেটা স্ট্রাকচার, যা রিলেশনাল ডেটাবেসের টেবিলের মতো কাজ করে এবং বিভিন্ন ধরণের ট্রান্সফর্মেশন করতে সহায়ক। DataFrame এর উপর Advanced Transformations কার্যকরভাবে ডেটা প্রসেসিং, ক্লিনিং, এবং বিশ্লেষণ করতে ব্যবহৃত হয়। এই ট্রান্সফর্মেশনগুলো সাধারণত wide এবং narrow হয়ে থাকে, যেখানে narrow transformation কম্পিউটেশনের জন্য খুব কম ডেটা শিফট করে, এবং wide transformation অনেক ডেটা শিফট করে।
এই টিউটোরিয়ালে আমরা Spark SQL-এ DataFrame এর জন্য কিছু advanced transformations নিয়ে আলোচনা করবো, যেমন flatMap(), groupBy(), agg(), map(), join(), withColumn(), ইত্যাদি।
১. flatMap() Transformation
flatMap() হলো একটি transformation, যা একটি কলামের প্রতিটি আইটেমকে একাধিক রেকর্ডে রূপান্তরিত করে। এটি সাধারণত ব্যবহার করা হয় যখন আপনি একটি কলামের প্রতিটি ভ্যালু থেকে একাধিক আইটেম তৈরি করতে চান।
উদাহরণ:
from pyspark.sql import SparkSession
# SparkSession তৈরি
spark = SparkSession.builder.appName("flatMap Example").getOrCreate()
# DataFrame তৈরি
data = [("John", ["Math", "Physics"]), ("Alice", ["Biology", "Chemistry"])]
columns = ["Name", "Subjects"]
df = spark.createDataFrame(data, columns)
# flatMap ব্যবহার করা
from pyspark.sql.functions import explode
df_flatmap = df.withColumn("Subject", explode(df["Subjects"]))
# DataFrame দেখানো
df_flatmap.show()
আউটপুট:
+-----+-------------------+---------+
| Name| Subjects| Subject|
+-----+-------------------+---------+
| John| [Math, Physics] | Math|
| John| [Math, Physics] | Physics|
|Alice| [Biology, Chemistry]| Biology|
|Alice| [Biology, Chemistry]|Chemistry|
+-----+-------------------+---------+
এখানে:
explode()ফাংশনটিflatMap()এর মতো কাজ করে এবং একক কলামকে একাধিক রেকর্ডে রূপান্তরিত করে।
২. groupBy() এবং agg() Transformation
groupBy() এবং agg() ব্যবহার করে আপনি DataFrame এর উপর বিভিন্ন অ্যাগ্রিগেশন অপারেশন করতে পারেন, যেমন গড়, মোট, সর্বোচ্চ, সর্বনিম্ন ইত্যাদি। groupBy() ব্যবহার করে একটি বা একাধিক কলাম ভিত্তিতে ডেটা গ্রুপ করা হয়, এবং তারপর agg() ব্যবহার করে অ্যাগ্রিগেশন ফাংশন প্রয়োগ করা হয়।
উদাহরণ:
# DataFrame তৈরি
data = [("John", "HR", 3000), ("Alice", "Finance", 5000), ("Bob", "HR", 3500), ("Dave", "Finance", 5500)]
columns = ["Name", "Department", "Salary"]
df = spark.createDataFrame(data, columns)
# groupBy এবং agg ব্যবহার করা
df_grouped = df.groupBy("Department").agg({"Salary": "avg", "Salary": "sum"})
# ফলাফল দেখানো
df_grouped.show()
আউটপুট:
+----------+--------+--------+
|Department|avg(Salary)|sum(Salary)|
+----------+--------+--------+
| HR| 3250.0| 6500|
| Finance| 5250.0| 10500|
+----------+--------+--------+
এখানে:
groupBy("Department")ডেটাকে বিভাগ অনুসারে গ্রুপ করে।agg({"Salary": "avg", "Salary": "sum"})দ্বারা প্রতিটি বিভাগের জন্য গড় এবং যোগফল হিসাব করা হয়েছে।
৩. map() Transformation
map() ফাংশনটি DataFrame এর প্রতিটি রেকর্ডে একটি নির্দিষ্ট ফাংশন প্রয়োগ করতে ব্যবহার করা হয়। এটি মূলত RDD-র মত কাজ করে, যেখানে একটি কলামের প্রতিটি আইটেমের উপর নির্দিষ্ট লজিক প্রয়োগ করা হয়।
উদাহরণ:
# map ব্যবহার করা
rdd = df.rdd.map(lambda row: (row["Name"], row["Salary"] * 1.1))
# নতুন DataFrame তৈরি করা
df_map = rdd.toDF(["Name", "New Salary"])
# DataFrame দেখানো
df_map.show()
আউটপুট:
+-----+---------+
| Name|New Salary|
+-----+---------+
| John| 3300.0|
|Alice| 5500.0|
| Bob| 3850.0|
| Dave| 6050.0|
+-----+---------+
এখানে:
map()ব্যবহার করেSalaryকলামের উপর একটি লজিক প্রয়োগ করা হয়েছে, যা ১০% ইনক্রিমেন্ট যোগ করে।
৪. join() Transformation
join() ফাংশনটি দুটি DataFrame একত্রিত করার জন্য ব্যবহৃত হয়। এটি SQL-এ JOIN এর মতো কাজ করে এবং একাধিক ধরনের join (INNER JOIN, LEFT JOIN, RIGHT JOIN, FULL JOIN) সমর্থন করে।
উদাহরণ:
# DataFrame তৈরি
data1 = [("John", "HR"), ("Alice", "Finance"), ("Bob", "IT")]
columns1 = ["Name", "Department"]
df1 = spark.createDataFrame(data1, columns1)
data2 = [("John", "Manager"), ("Alice", "Analyst")]
columns2 = ["Name", "Position"]
df2 = spark.createDataFrame(data2, columns2)
# join() ব্যবহার করা
df_joined = df1.join(df2, "Name", "inner")
# DataFrame দেখানো
df_joined.show()
আউটপুট:
+-----+----------+--------+
| Name|Department| Position|
+-----+----------+--------+
| John| HR| Manager|
|Alice| Finance| Analyst|
+-----+----------+--------+
এখানে:
join(df2, "Name", "inner")দুইটি DataFrame-কেNameকলামের উপর INNER JOIN করেছে।
৫. withColumn() Transformation
withColumn() ফাংশনটি একটি নতুন কলাম যোগ করতে বা পূর্বের কলামের মান পরিবর্তন করতে ব্যবহৃত হয়। এটি একটি DataFrame এর উপর নির্দিষ্ট ট্রান্সফর্মেশন প্রয়োগ করে একটি নতুন কলাম তৈরি করে।
উদাহরণ:
from pyspark.sql.functions import col
# withColumn ব্যবহার করা
df_new = df.withColumn("New Salary", col("Salary") * 1.2)
# DataFrame দেখানো
df_new.show()
আউটপুট:
+-----+--------+---------+-----------+
| Name| Department| Salary| New Salary|
+-----+--------+---------+-----------+
| John| HR| 3000.0| 3600.0|
|Alice| Finance| 5000.0| 6000.0|
| Bob| HR| 3500.0| 4200.0|
| Dave| Finance| 5500.0| 6600.0|
+-----+--------+---------+-----------+
এখানে:
withColumn()ব্যবহার করেSalaryকলামের উপর একটি ২০% ইনক্রিমেন্ট যোগ করে একটি নতুন কলামNew Salaryতৈরি করা হয়েছে।
সারাংশ
Spark SQL-এর Advanced Transformations ব্যবহার করে আপনি DataFrame-এর উপর বিভিন্ন ধরনের জটিল ট্রান্সফর্মেশন করতে পারেন, যেমন flatMap(), groupBy(), agg(), map(), join(), withColumn() ইত্যাদি। এই ট্রান্সফর্মেশনগুলো ডেটার ওপর উচ্চ কার্যক্ষমতা সম্পন্ন অপারেশন করতে সহায়তা করে এবং ডেটা বিশ্লেষণ ও প্রসেসিংকে আরও কার্যকরী করে তোলে। Spark SQL DataFrame API-র এই ফিচারগুলি আপনাকে ডেটাকে বিভিন্নভাবে পরিবর্তন করতে, নতুন কলাম যোগ করতে এবং বিভিন্ন গ্রুপিং, অ্যাগ্রিগেশন অপারেশন চালাতে সহায়তা করবে।
Spark SQL-এ Advanced Transformations যেমন map, flatMap, এবং reduceByKey খুবই গুরুত্বপূর্ণ এবং কার্যকরী অপারেশন, যা RDD (Resilient Distributed Dataset) বা DataFrame/Dataset-এ ডেটা প্রসেসিং করার সময় ব্যবহৃত হয়। এই অপারেশনগুলি সাধারণত ডেটার উপর জটিল ট্রান্সফর্মেশন করতে সাহায্য করে এবং Spark-এর ইন-মেমরি কম্পিউটিং সুবিধাকে পুরোপুরি ব্যবহার করতে সক্ষম। চলুন, এই Advanced Transformations-এর প্রতিটি সম্পর্কে বিস্তারিত জানি।
১. map Transformation
map Transformation হল এমন একটি অপারেশন, যা RDD বা DataFrame/Dataset এর প্রতিটি উপাদানের উপর একটি নির্দিষ্ট ফাংশন প্রয়োগ করে একটি নতুন RDD বা DataFrame তৈরি করে। map সাধারণত প্রত্যেকটি ইনপুট ভ্যালুর উপর একটি একক আউটপুট ভ্যালু তৈরি করে।
উদাহরণ:
# SparkSession তৈরি
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("map Transformation Example").getOrCreate()
# DataFrame তৈরি
data = [("Alice", 28), ("Bob", 25), ("Charlie", 30)]
columns = ["name", "age"]
df = spark.createDataFrame(data, columns)
# map Transformation ব্যবহার: age এর উপর 2 যোগ করা
df_transformed = df.rdd.map(lambda x: (x[0], x[1] + 2))
df_transformed.collect()
আউটপুট:
[('Alice', 30), ('Bob', 27), ('Charlie', 32)]
এখানে, map Transformation ব্যবহার করে age-এর ওপর ২ যোগ করা হয়েছে এবং একটি নতুন RDD তৈরি করা হয়েছে।
২. flatMap Transformation
flatMap Transformation হল এমন একটি অপারেশন, যা RDD বা DataFrame/Dataset এর প্রতিটি উপাদানের উপর একটি ফাংশন প্রয়োগ করে একাধিক আউটপুট তৈরি করতে পারে। map এর মতো একটি একক আউটপুট ভ্যালু তৈরি না করে, flatMap একটি ভেক্টর বা তালিকা (list) তৈরি করে। এই কারণে, flatMap বিভিন্ন আউটপুট তৈরি করতে সক্ষম।
উদাহরণ:
# DataFrame তৈরি
data = [("Alice", "HR"), ("Bob", "Finance"), ("Charlie", "IT")]
columns = ["name", "department"]
df = spark.createDataFrame(data, columns)
# flatMap Transformation ব্যবহার: নামের প্রত্যেক অক্ষরের ওপর কাজ করা
flat_map_result = df.rdd.flatMap(lambda x: x[0]) # নামের প্রতিটি অক্ষর আলাদা আলাদা করে দিবে
flat_map_result.collect()
আউটপুট:
['A', 'l', 'i', 'c', 'e', 'B', 'o', 'b', 'C', 'h', 'a', 'r', 'l', 'i', 'e']
এখানে, flatMap Transformation ব্যবহার করে, name কলামের প্রত্যেকটি অক্ষর আলাদা আলাদা করে বের করা হয়েছে।
৩. reduceByKey Transformation
reduceByKey Transformation হল একটি বিশেষ ধরনের অপারেশন, যা কিপর্যন্ত একক কিপেয়ার ভ্যালুদের উপর একটি রিডাকশন অপারেশন চালায়। এটি সাধারণত ব্যবহৃত হয় যখন আপনি একটি কিপেয়ার ডেটা (যেমন key-value পেয়ার) উপর কোনো অ্যাগ্রিগেশন করতে চান, যেমন যোগফল বা গুনফল বের করা। reduceByKey মূলত key-র উপর একত্রিত (combine) করে ভ্যালুগুলিকে রিডিউস করে।
উদাহরণ:
# Key-Value পেয়ার হিসেবে DataFrame তৈরি
data = [("Alice", 3), ("Bob", 2), ("Alice", 4), ("Bob", 5)]
columns = ["name", "score"]
rdd = spark.sparkContext.parallelize(data)
# reduceByKey ব্যবহার: নামের প্রতি স্কোর যোগ করা
reduced_result = rdd.reduceByKey(lambda a, b: a + b)
reduced_result.collect()
আউটপুট:
[('Alice', 7), ('Bob', 7)]
এখানে, reduceByKey Transformation ব্যবহার করে প্রতিটি নামের স্কোর যোগ করা হয়েছে। Alice এবং Bob এর স্কোর একত্রিত করা হয়েছে এবং তাদের মোট স্কোর বের করা হয়েছে।
Spark SQL-এ Advanced Transformations এর প্রয়োগ
Spark SQL-এ এই Advanced Transformations-গুলি RDD এবং DataFrame/Dataset উভয়ের জন্য ব্যবহৃত হতে পারে। DataFrame বা Dataset-এ RDD API ব্যবহার করার মাধ্যমে map, flatMap, এবং reduceByKey রূপান্তরের কার্যক্ষমতা বাড়ানো যেতে পারে।
উদাহরণ (DataFrame এ map এবং flatMap ব্যবহার):
# DataFrame তৈরি
data = [("Alice", 28), ("Bob", 25), ("Charlie", 30)]
columns = ["name", "age"]
df = spark.createDataFrame(data, columns)
# map Transformation ব্যবহার: age এর ওপর 2 যোগ করা
df_transformed = df.rdd.map(lambda x: (x[0], x[1] + 2))
# flatMap Transformation ব্যবহার: নামের প্রতি অক্ষরের জন্য flatMap
flat_map_result = df.rdd.flatMap(lambda x: x[0])
flat_map_result.collect()
উদাহরণ (DataFrame এ reduceByKey ব্যবহার):
# Key-Value DataFrame তৈরি
data = [("Alice", 3), ("Bob", 2), ("Alice", 4), ("Bob", 5)]
columns = ["name", "score"]
df = spark.createDataFrame(data, columns)
# RDD এ রূপান্তর এবং reduceByKey প্রয়োগ
rdd = df.rdd
reduced_result = rdd.map(lambda x: (x[0], x[1])).reduceByKey(lambda a, b: a + b)
reduced_result.collect()
সারাংশ
Spark SQL-এ Advanced Transformations যেমন map, flatMap, এবং reduceByKey ডেটা প্রসেসিংয়ের অত্যন্ত শক্তিশালী টুলস। map Transformation প্রতিটি উপাদানকে নতুন ভ্যালুতে রূপান্তরিত করে, flatMap একাধিক আউটপুট তৈরি করতে পারে, এবং reduceByKey একটি key-value পেয়ার নিয়ে একত্রিত এবং রিডিউস অপারেশন করতে সাহায্য করে। এই ট্রান্সফর্মেশনগুলির মাধ্যমে Spark SQL ডেটার উপর জটিল অ্যাগ্রিগেশন, ট্রান্সফরমেশন এবং ম্যানিপুলেশন করতে সক্ষম হয়, যা ডিস্ট্রিবিউটেড কম্পিউটিংয়ের জন্য অত্যন্ত কার্যকরী।
Aggregations এবং Data Summarization হল ডেটা অ্যানালাইসিসের অত্যন্ত গুরুত্বপূর্ণ অংশ, যা আপনাকে ডেটার বিভিন্ন পরিসংখ্যানিক সংক্ষিপ্ত রূপ দেখতে সাহায্য করে। Spark SQL-এর মাধ্যমে আপনি ডেটাকে বিভিন্ন দৃষ্টিকোণ থেকে বিশ্লেষণ করতে পারেন, যেমন গড় (mean), যোগফল (sum), গুনফল (product), সর্বোচ্চ (max), সর্বনিম্ন (min) ইত্যাদি। এই ধরনের অপারেশনগুলি সাধারণত Group By ও Aggregation Functions ব্যবহার করে করা হয়। চলুন, Spark SQL-এ Aggregations এবং Data Summarization Techniques এর ব্যবহার দেখে নিই।
Aggregations এবং Data Summarization Techniques
১. Basic Aggregation Functions
Spark SQL এ কিছু সাধারণ Aggregation Functions রয়েছে, যা ডেটা Summarization করার জন্য ব্যবহৃত হয়। এই ফাংশনগুলো আপনাকে ডেটার উপর বিভিন্ন পরিসংখ্যানিক অপারেশন করতে সহায়তা করে।
প্রধান Aggregation Functions:
count(): একটি কলামের মোট রেকর্ডের সংখ্যা গণনা করে।sum(): একটি কলামের মোট যোগফল হিসাব করে।avg(): একটি কলামের গড় মান হিসাব করে।min(): একটি কলামের সর্বনিম্ন মান হিসাব করে।max(): একটি কলামের সর্বোচ্চ মান হিসাব করে।first(): একটি কলামের প্রথম মান নেয়।last(): একটি কলামের শেষ মান নেয়।
উদাহরণ:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, sum, min, max
# SparkSession তৈরি
spark = SparkSession.builder.appName("Aggregation Example").getOrCreate()
# DataFrame তৈরি
data = [("John", "HR", 30, 5000),
("Alice", "Finance", 25, 6000),
("Bob", "IT", 35, 5500),
("Dave", "HR", 32, 7000),
("Eve", "Finance", 28, 8000)]
columns = ["Name", "Department", "Age", "Salary"]
df = spark.createDataFrame(data, columns)
# Aggregation Function ব্যবহার
df_aggregated = df.groupBy("Department").agg(
avg("Age").alias("avg_age"),
sum("Salary").alias("total_salary"),
min("Age").alias("min_age"),
max("Age").alias("max_age")
)
df_aggregated.show()
আউটপুট:
+----------+-------+-----------+-------+-------+
|Department|avg_age|total_salary|min_age|max_age|
+----------+-------+-----------+-------+-------+
| HR| 31.0| 12000| 30| 32|
| Finance| 26.5| 14000| 25| 28|
| IT| 35.0| 5500| 35| 35|
+----------+-------+-----------+-------+-------+
এখানে:
groupBy("Department"): ডেটাকে বিভাগের ভিত্তিতে গ্রুপ করা হয়েছে।agg(): বিভিন্ন অ্যাগ্রিগেশন ফাংশন প্রয়োগ করা হয়েছে, যেমনavg(),sum(),min(), এবংmax()।
২. Group By Operation
Group By একটি SQL অপারেশন, যা ডেটাকে একটি বা একাধিক কলামের ভিত্তিতে গ্রুপ করে এবং প্রতিটি গ্রুপের জন্য একটি নির্দিষ্ট অ্যাগ্রিগেশন ফাংশন প্রয়োগ করে। এটি সাধারণত বিভিন্ন ধরনের রিপোর্ট তৈরি এবং ডেটার সারাংশ তৈরি করতে ব্যবহৃত হয়।
উদাহরণ:
# Group By এবং Aggregation ফাংশন ব্যবহার
df_grouped = df.groupBy("Department").agg(
count("Name").alias("count"),
avg("Salary").alias("avg_salary")
)
df_grouped.show()
আউটপুট:
+----------+-----+----------+
|Department|count|avg_salary|
+----------+-----+----------+
| HR| 2| 6000.0 |
| Finance| 2| 7000.0 |
| IT| 1| 5500.0 |
+----------+-----+----------+
এখানে:
count("Name"):Departmentঅনুযায়ী নামের সংখ্যা গণনা করা হয়েছে।avg("Salary"): প্রতিটি বিভাগের গড় বেতন হিসাব করা হয়েছে।
৩. Window Functions for Advanced Aggregations
Window Functions ব্যবহার করে আপনি বিভিন্ন ধরনের আরো জটিল অ্যাগ্রিগেশন করতে পারেন। এটি সাধারণত রোলিং সুম, রানিং টোটাল, বা পার্টিশন-বাই-পার্টিশন অপারেশনগুলির জন্য ব্যবহৃত হয়। Spark SQL-এ আপনি window ফাংশন ব্যবহার করে এই ধরনের অ্যাগ্রিগেশন করতে পারেন।
উদাহরণ:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
# Window function ব্যবহার
windowSpec = Window.partitionBy("Department").orderBy("Salary")
# প্রতিটি বিভাগের জন্য Salary অনুযায়ী row_number দিন
df_with_row_number = df.withColumn("row_number", row_number().over(windowSpec))
df_with_row_number.show()
আউটপুট:
+-----+----------+---+------+-----------+
| Name|Department|Age|Salary|row_number|
+-----+----------+---+------+-----------+
| John| HR| 30| 5000| 1|
| Dave| HR| 32| 7000| 2|
|Alice| Finance| 25| 6000| 1|
| Eve| Finance| 28| 8000| 2|
| Bob| IT| 35| 5500| 1|
+-----+----------+---+------+-----------+
এখানে:
Window.partitionBy("Department").orderBy("Salary"): প্রতি বিভাগে বেতনের অর্ডার অনুসারে row number দেওয়া হয়েছে।
৪. Handling Missing Data in Aggregations
Spark SQL-এ Missing Data বা Null ভ্যালু পরিচালনার জন্য na.fill(), na.drop() বা na.replace() ফাংশন ব্যবহার করা যায়। এই ফাংশনগুলির মাধ্যমে আপনি অ্যাগ্রিগেশন করার আগে বা পরে ডেটার গ্যাপ পূরণ করতে পারেন।
উদাহরণ:
# Missing data পূর্ণ করা
df_filled = df.na.fill({"Salary": 0, "Age": 30})
df_filled.show()
এখানে:
na.fill()ব্যবহার করেSalaryকলামটিতে যেসব নাল (Null) ভ্যালু ছিল, তা পূর্ণ করা হয়েছে।
৫. Data Summarization Techniques
Spark SQL-এ Data Summarization একটি অত্যন্ত কার্যকরী কৌশল, যা আপনাকে ডেটার একটি সংক্ষিপ্ত, কিন্তু স্পষ্ট দৃশ্য প্রদান করে। এই কৌশলটির মাধ্যমে আপনি ডেটার উপর বিভিন্ন পরিসংখ্যানিক বিশ্লেষণ করতে পারেন।
Describe Function:
Spark SQL-এ describe() ফাংশন ব্যবহার করে একটি DataFrame-এর গড়, সর্বনিম্ন, সর্বোচ্চ, স্ট্যান্ডার্ড ডেভিয়েশন ইত্যাদি সম্পর্কে সংক্ষিপ্ত তথ্য পাওয়া যায়।
# describe function ব্যবহার
df.describe().show()
আউটপুট:
+-------+-----+----------+----+------+
|summary| Name|Department| Age|Salary|
+-------+-----+----------+----+------+
| count| 5| 5| 5| 5|
| mean| null| null| 29.0| 6700|
| stddev| null| null| 3.14| 1202|
| min| Alice| Finance| 25| 5000|
| max| John| HR| 35| 8000|
+-------+-----+----------+----+------+
এখানে:
describe()ডেটার মোট রেকর্ড, গড়, স্ট্যান্ডার্ড ডেভিয়েশন, সর্বনিম্ন এবং সর্বোচ্চ মান দেখায়।
সারাংশ
Spark SQL-এ Aggregations এবং Data Summarization Techniques আপনাকে ডেটার উপর বিভিন্ন পরিসংখ্যানিক অপারেশন এবং বিশ্লেষণ করতে সহায়তা করে। Group By, Aggregation Functions, Window Functions, এবং Data Summarization পদ্ধতির মাধ্যমে আপনি ডেটার থেকে মূল্যবান তথ্য উপস্থাপন করতে পারেন। Describe() ফাংশন এবং অন্যান্য অপারেশনগুলির মাধ্যমে ডেটার সারাংশ তৈরি করা সহজ এবং দ্রুত হয়, যা ডেটা বিশ্লেষণে গুরুত্বপূর্ণ ভূমিকা পালন করে।
Advanced Filtering Techniques ব্যবহার করে Spark SQL-এ DataFrame বা টেবিল থেকে নির্দিষ্ট ডেটা নির্বাচন করা যায়। যখন ডেটাসেট বড় এবং জটিল হয়, তখন সঠিক এবং কার্যকরীভাবে ডেটা ফিল্টার করা খুবই গুরুত্বপূর্ণ। Spark SQL-এ বিভিন্ন উন্নত ফিল্টারিং কৌশল রয়েছে, যা ব্যবহার করে আপনি ডেটা আরও সূক্ষ্মভাবে নির্বাচন করতে পারবেন।
এখানে, আমরা Spark SQL-এ DataFrame এর জন্য Advanced Filtering Techniques আলোচনা করব, যেমন complex conditions, null value filtering, regular expressions, এবং filtering with multiple columns।
১. Complex Filtering with Multiple Conditions
Spark SQL-এ একাধিক শর্তের মাধ্যমে ফিল্টারিং করা সম্ভব, যেখানে AND এবং OR লজিক্যাল অপারেটর ব্যবহার করা হয়।
উদাহরণ:
from pyspark.sql import SparkSession
# SparkSession তৈরি
spark = SparkSession.builder.appName("Advanced Filtering Example").getOrCreate()
# উদাহরণ DataFrame তৈরি
data = [("Alice", 28, "HR", 2000),
("Bob", 35, "IT", 5000),
("Charlie", 40, "HR", 4000),
("David", 30, "IT", 4500),
("Eva", 32, "Finance", 5500)]
columns = ["Name", "Age", "Department", "Salary"]
df = spark.createDataFrame(data, columns)
# Complex filtering using AND and OR
filtered_df = df.filter((df["Age"] > 30) & (df["Department"] == "IT") | (df["Salary"] > 4500))
filtered_df.show()
এখানে, AND এবং OR অপারেটর ব্যবহার করে একটি complex condition তৈরি করা হয়েছে, যেখানে Age > 30 এবং Department == "IT" অথবা Salary > 4500 শর্ত পূরণ করলে রেকর্ড ফিরিয়ে আনা হবে।
২. Filtering with NULL Values
Spark SQL-এ NULL মানগুলির জন্য বিশেষভাবে ফিল্টারিং করা যায়। আপনি isNull() এবং isNotNull() মেথড ব্যবহার করে NULL এবং non-null মানের জন্য ফিল্টার করতে পারেন।
উদাহরণ:
# Filter rows where salary is NULL
df.filter(df["Salary"].isNull()).show()
# Filter rows where salary is NOT NULL
df.filter(df["Salary"].isNotNull()).show()
এখানে, প্রথম কোয়ারি এমন রেকর্ড ফিল্টার করবে যেগুলোর Salary কলামে NULL মান রয়েছে এবং দ্বিতীয় কোয়ারি ফিল্টার করবে যেখানে Salary কলামে কোনো NULL মান নেই।
৩. Filtering with Regular Expressions (Regex)
Spark SQL-এ regular expressions ব্যবহার করে টেক্সট ফিল্টারিং করা যায়। rlike() মেথড ব্যবহার করে আপনি একটি নির্দিষ্ট প্যাটার্নের সাথে মেলানো রেকর্ড নির্বাচন করতে পারেন।
উদাহরণ:
# Filtering using regular expression
df.filter(df["Name"].rlike("^A")).show() # Names starting with "A"
এখানে, ^A রেগুলার এক্সপ্রেশন ব্যবহার করে Name কলামের সমস্ত রেকর্ড ফিল্টার করা হচ্ছে যেগুলি "A" দিয়ে শুরু হয়।
৪. Filtering on Multiple Columns
Spark SQL-এ একাধিক কলামের ভিত্তিতে ফিল্টারিং করা অনেকটা সাধারণ, এবং এটি বিভিন্ন শর্তের ভিত্তিতে একসাথে কাজ করতে সক্ষম।
উদাহরণ:
# Filtering on multiple columns
df.filter((df["Age"] > 30) & (df["Department"] == "IT") & (df["Salary"] > 4000)).show()
এখানে, Age > 30, Department == "IT", এবং Salary > 4000 শর্তগুলো একসাথে ব্যবহার করা হয়েছে, এবং শুধুমাত্র সেই রেকর্ডগুলো ফিল্টার করা হয়েছে যা সমস্ত শর্ত পূরণ করে।
৫. Using SQL Expressions for Complex Filtering
Spark SQL-এ আপনি SQL কোয়ারি ব্যবহার করে DataFrame-এ আরও জটিল ফিল্টারিং করতে পারেন। SparkSession এর মাধ্যমে SQL কোয়ারি লিখে আপনি আরও সহজে এবং উচ্চতর কাস্টমাইজেশন সহ ফিল্টারিং করতে পারবেন।
উদাহরণ:
# Creating a temporary view for SQL queries
df.createOrReplaceTempView("employee_data")
# Using SQL query for filtering
result = spark.sql("SELECT * FROM employee_data WHERE Age > 30 AND Department = 'IT' AND Salary > 4500")
result.show()
এখানে, SQL কোয়ারি ব্যবহার করে Age, Department, এবং Salary কলামগুলি শর্তযুক্তভাবে ফিল্টার করা হয়েছে।
৬. Dynamic Filtering using User-Defined Conditions
Spark SQL-এ ডাইনামিক শর্তের মাধ্যমে DataFrame ফিল্টার করা যায়। আপনি একটি কাস্টম শর্ত তৈরি করে ফিল্টার করতে পারেন, যা ব্যবহারকারীর ইনপুট বা অন্য ডেটা সোর্স থেকে আসে।
উদাহরণ:
# Dynamic Filtering based on user input
age_filter = 30
salary_filter = 4500
df.filter((df["Age"] > age_filter) & (df["Salary"] > salary_filter)).show()
এখানে, age_filter এবং salary_filter ভেরিয়েবল দিয়ে শর্ত তৈরি করা হয়েছে, যা ব্যবহারকারী ইনপুট বা অন্য কোনো সূত্র থেকে আসতে পারে।
৭. Filtering with isin() for Multiple Values
isin() ফাংশন ব্যবহার করে আপনি একাধিক মানের জন্য ফিল্টারিং করতে পারেন, যা সাধারণত একটি নির্দিষ্ট সেটের মধ্যে থাকা ডেটা নির্বাচন করতে ব্যবহৃত হয়।
উদাহরণ:
# Filtering with isin for multiple values
df.filter(df["Department"].isin("HR", "IT")).show()
এখানে, isin() ব্যবহার করে শুধুমাত্র সেই রেকর্ডগুলো ফিল্টার করা হচ্ছে যেগুলির Department কলামের মান HR বা IT।
৮. Combining Filters with select()
Spark SQL-এ আপনি একাধিক ফিল্টার শর্ত এবং select() ব্যবহার করে DataFrame এর নির্দিষ্ট কলামগুলো নির্বাচন করতে পারেন। এটি নির্দিষ্ট ডেটা সংক্ষেপে এবং দ্রুত খুঁজে পেতে সাহায্য করে।
উদাহরণ:
# Combining filters with select
df.filter((df["Age"] > 30) & (df["Salary"] > 4000)) \
.select("Name", "Department", "Salary") \
.show()
এখানে, ফিল্টারিং এর পর শুধুমাত্র Name, Department, এবং Salary কলামগুলো নির্বাচন করা হয়েছে।
সারাংশ
Spark SQL-এ Advanced Filtering Techniques ব্যবহার করে আপনি সহজেই DataFrame বা SQL টেবিল থেকে জটিল শর্তে ডেটা ফিল্টার করতে পারেন। Complex conditions, NULL value filtering, regular expressions, এবং multiple columns filtering এর মাধ্যমে আপনি ডেটার উপর উচ্চতর কাস্টমাইজড শর্ত প্রয়োগ করতে পারেন। এছাড়া, SQL expressions, dynamic filtering, এবং isin() ব্যবহার করে ডেটা দ্রুত এবং কার্যকরীভাবে ফিল্টার করা সম্ভব। Spark SQL-এ এসব কৌশলগুলি ব্যবহার করে আপনি বড় ডেটাসেটের মধ্যে থেকে দ্রুত এবং দক্ষতার সাথে প্রয়োজনীয় ডেটা খুঁজে বের করতে পারবেন।
Spark SQL-এ Nested Transformations হল এমন ট্রান্সফর্মেশন যা একাধিক স্টেপে ডেটা প্রক্রিয়া করে, যেখানে একটি ট্রান্সফর্মেশন অন্য ট্রান্সফর্মেশন বা অপারেশনের মধ্যে থাকে। Spark SQL এবং DataFrame/Dataset API-তে Nested Transformations ব্যবহার করে ডেটাকে আরও জটিলভাবে প্রক্রিয়া করা সম্ভব। Nested Transformations অত্যন্ত শক্তিশালী টুল যা ডেটাকে filter, map, aggregate ইত্যাদি ধাপে ধাপে প্রক্রিয়া করতে সাহায্য করে।
এই টিউটোরিয়ালে আমরা Nested Transformations এর বিভিন্ন উদাহরণ এবং তাদের পারফরম্যান্স ইমপ্যাক্ট সম্পর্কে আলোচনা করব।
১. Nested Transformations এর ব্যবহার
Nested Transformations সাধারণত একটি স্টেপে একটি ট্রান্সফর্মেশন অ্যাপ্লাই করার পর তার উপর পরবর্তী ট্রান্সফর্মেশন অ্যাপ্লাই করে। এই ধরনের ট্রান্সফর্মেশন ডেটা শেপিং এবং ক্লিনিং-এ অত্যন্ত কার্যকরী হতে পারে।
উদাহরণ ১: DataFrame-এ Nested map এবং filter ব্যবহার
# SparkSession তৈরি
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Nested Transformations Example").getOrCreate()
# DataFrame তৈরি
data = [("Alice", 28), ("Bob", 25), ("Charlie", 30), ("David", 35)]
columns = ["name", "age"]
df = spark.createDataFrame(data, columns)
# Nested Transformation: filter followed by map
df_transformed = df.filter(df['age'] > 25).rdd.map(lambda x: (x[0], x[1] + 1))
df_transformed.collect()
এখানে, প্রথমে filter ট্রান্সফর্মেশন ব্যবহার করে বয়স ২৫ এর বেশি এমন রেকর্ডগুলোকে ফিল্টার করা হয়েছে, এবং পরে map ট্রান্সফর্মেশন ব্যবহার করে তাদের বয়সে ১ যোগ করা হয়েছে। এই দুটি ট্রান্সফর্মেশন nested হয়ে একসাথে কাজ করেছে।
আউটপুট:
[('Alice', 29), ('Charlie', 31), ('David', 36)]
এখানে, filter এবং map একত্রে ব্যবহার করা হয়েছে, যাতে প্রথমে ফিল্টার করা হয় এবং তারপর সেটির উপর একটি নতুন মান (age + 1) অ্যাসাইন করা হয়।
২. Nested Aggregations
Nested Aggregations হল এমন একটি কৌশল যেখানে একাধিক গ্রুপিং বা অ্যাগ্রিগেট অপারেশন একে অপরের ভিতরে চলে। এটি বড় ডেটাসেটের উপর জটিল গাণিতিক পরিসংখ্যান বের করার জন্য ব্যবহৃত হয়, যেমন গড়, সর্বোচ্চ, সর্বনিম্ন, গুণফল ইত্যাদি।
উদাহরণ ২: Nested groupBy এবং agg ব্যবহার
# DataFrame তৈরি
data = [("Alice", "HR", 1000), ("Bob", "HR", 1500), ("Charlie", "Finance", 2000),
("David", "Finance", 2500), ("Eva", "HR", 1200)]
columns = ["name", "department", "salary"]
df = spark.createDataFrame(data, columns)
# Nested aggregation: প্রথমে department অনুযায়ী গ্রুপিং তারপর salary এর গড় বের করা
result = df.groupBy("department").agg(
{"salary": "avg"}).alias("avg_salary")
result.show()
এখানে, প্রথমে groupBy("department") ব্যবহার করে ডেটাকে বিভাগ অনুযায়ী গ্রুপ করা হয়েছে এবং তারপর সেই গ্রুপের salary কলামের গড় (average) বের করার জন্য agg ফাংশন ব্যবহার করা হয়েছে।
আউটপুট:
+---------+-----------+
|department|avg(salary)|
+---------+-----------+
|HR |1233.33 |
|Finance |2250.00 |
+---------+-----------+
এখানে, groupBy এবং agg ব্যবহার করে দুটি ট্রান্সফর্মেশন nested হয়ে কাজ করেছে।
৩. Nested withColumn and select Transformations
withColumn এবং select হলো Spark SQL-এর অন্যতম গুরুত্বপূর্ণ ট্রান্সফর্মেশন। এই দুটি ট্রান্সফর্মেশন একে অপরের মধ্যে nested ভাবে ব্যবহার করা যেতে পারে। উদাহরণস্বরূপ, আপনি withColumn ব্যবহার করে নতুন কলাম তৈরি করতে পারেন এবং তারপরে select ব্যবহার করে নির্দিষ্ট কলাম নির্বাচন করতে পারেন।
উদাহরণ ৩: Nested withColumn এবং select ব্যবহার
# DataFrame তৈরি
data = [("Alice", 28), ("Bob", 25), ("Charlie", 30), ("David", 35)]
columns = ["name", "age"]
df = spark.createDataFrame(data, columns)
# Nested Transformation: withColumn followed by select
df_transformed = df.withColumn("age_plus_ten", df['age'] + 10).select("name", "age_plus_ten")
df_transformed.show()
এখানে, withColumn ব্যবহার করে একটি নতুন কলাম age_plus_ten তৈরি করা হয়েছে, যা age কলামের উপর ১০ যোগ করে। তারপর select ব্যবহার করে নির্দিষ্ট কলাম নির্বাচন করা হয়েছে।
আউটপুট:
+-------+-----------+
| name|age_plus_ten|
+-------+-----------+
| Alice| 38|
| Bob| 35|
|Charlie| 40|
| David| 45|
+-------+-----------+
এখানে, withColumn এবং select দুটি ট্রান্সফর্মেশন nested হয়ে কাজ করেছে, প্রথমে একটি নতুন কলাম তৈরি করা হয়েছে এবং পরে নির্দিষ্ট কলামগুলো নির্বাচন করা হয়েছে।
৪. Chaining Multiple Transformations
Spark SQL-এ multiple transformations chaining করা খুবই সাধারণ, যেখানে একাধিক ট্রান্সফর্মেশন একের পর এক ডেটার উপর প্রয়োগ করা হয়। উদাহরণস্বরূপ, প্রথমে ফিল্টারিং, তারপর গ্রুপিং, তারপর অগ্রগতি।
উদাহরণ ৪: Chaining filter, groupBy, এবং agg Transformations
# DataFrame তৈরি
data = [("Alice", "HR", 1000), ("Bob", "HR", 1500), ("Charlie", "Finance", 2000),
("David", "Finance", 2500), ("Eva", "HR", 1200)]
columns = ["name", "department", "salary"]
df = spark.createDataFrame(data, columns)
# Chained transformations: filter -> groupBy -> agg
result = df.filter(df["salary"] > 1500).groupBy("department").agg({"salary": "avg"}).alias("avg_salary")
result.show()
এখানে, একসাথে filter, groupBy, এবং agg ট্রান্সফর্মেশনগুলি chaining করা হয়েছে। প্রথমে বেতন ১৫০০ এর বেশি এমন ব্যক্তিদের ফিল্টার করা হয়েছে, তারপর তাদের ডিপার্টমেন্ট অনুযায়ী গ্রুপিং করা হয়েছে এবং শেষে salary এর গড় বের করা হয়েছে।
আউটপুট:
+---------+-----------+
|department|avg(salary)|
+---------+-----------+
|Finance |2250.00 |
+---------+-----------+
এখানে, একাধিক ট্রান্সফর্মেশন chaining করা হয়েছে যা ডেটাকে প্রসেস করার জন্য একটি সংযুক্ত পদ্ধতি প্রদান করেছে।
সারাংশ
Nested Transformations Spark SQL-এ একাধিক ট্রান্সফর্মেশন একসাথে চালানোর একটি শক্তিশালী উপায়। map, flatMap, filter, groupBy, withColumn, select, এবং agg এর মতো ট্রান্সফর্মেশনগুলি একে অপরের মধ্যে nestedভাবে প্রয়োগ করা যায়। এসব Nested Transformations ব্যবহার করে ডেটাকে জটিলভাবে প্রক্রিয়া করা সম্ভব এবং এতে ডেটার পরিষ্কারকরণ, অ্যানালাইসিস এবং অন্যান্য অপারেশনগুলো দ্রুত করা যায়।
Read more