Big Data and Analytics Scalar এবং Aggregation UDFs তৈরি করা গাইড ও নোট

324

User-Defined Functions (UDFs) হল Spark SQL-এর একটি শক্তিশালী ফিচার যা ব্যবহারকারীদের নিজস্ব কাস্টম ফাংশন তৈরি এবং SQL কোয়ারি বা DataFrame API তে প্রয়োগ করার সুযোগ দেয়। Spark SQL এ দুটি প্রধান ধরনের UDFs ব্যবহৃত হয়:

  • Scalar UDFs: স্কেলার মান (single value) ফেরত দেয় এমন ফাংশন, যা প্রতিটি রেকর্ড বা সারির উপর কাজ করে।
  • Aggregation UDFs: এই ধরনের UDFs ডেটার গ্রুপে অ্যাগ্রিগেটিভ অপারেশন (যেমন, SUM, AVG) প্রয়োগ করে, যা একাধিক রেকর্ড বা সারির উপর কাজ করে।

এখানে আমরা দুটি ধরনের UDF তৈরি করার প্রক্রিয়া এবং উদাহরণ দেখবো।


১. Scalar UDF (User-Defined Function)

Scalar UDF হলো এমন একটি কাস্টম ফাংশন যা একটি একক মান (single value) রিটার্ন করে এবং একটি কলাম বা সারির উপর কাজ করে। এই ধরনের ফাংশন সাধারণত DataFrame বা SQL কোয়ারি তে ব্যবহার করা হয়, যেখানে প্রতিটি রেকর্ডে একটি কাস্টম লজিক প্রয়োগ করতে হয়।

Scalar UDF তৈরি এবং ব্যবহারের উদাহরণ

ধরা যাক, আমাদের একটি DataFrame আছে যেখানে গ্রাহকের বয়স রয়েছে এবং আমরা একটি কাস্টম ফাংশন তৈরি করতে চাই যা গ্রাহকের বয়সের উপর ভিত্তি করে তার জীবনসঙ্গীর যোগ্যতা নির্ধারণ করবে। উদাহরণস্বরূপ, যদি বয়স ২৫-এর বেশি হয়, তাহলে "Eligible" হবে, নাহলে "Not Eligible" হবে।

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# SparkSession তৈরি
spark = SparkSession.builder \
    .appName("Scalar UDF Example") \
    .getOrCreate()

# উদাহরণ DataFrame তৈরি
data = [("John", 28), ("Alice", 22), ("Bob", 30)]
columns = ["name", "age"]
df = spark.createDataFrame(data, columns)

# Scalar UDF তৈরি করা
def eligibility(age):
    if age > 25:
        return "Eligible"
    else:
        return "Not Eligible"

# UDF নিবন্ধন করা
eligibility_udf = udf(eligibility, StringType())

# UDF ব্যবহার করে নতুন কলাম তৈরি করা
df_with_eligibility = df.withColumn("eligibility", eligibility_udf(df["age"]))

df_with_eligibility.show()

আউটপুট:

+-----+---+-----------+
| name|age|eligibility|
+-----+---+-----------+
| John| 28|   Eligible|
|Alice| 22|Not Eligible|
|  Bob| 30|   Eligible|
+-----+---+-----------+

এখানে, Scalar UDF eligibility তৈরি করা হয়েছে যা age কলামের ভিত্তিতে "Eligible" বা "Not Eligible" মান রিটার্ন করছে এবং সেই মানটি DataFrame-এ একটি নতুন কলাম হিসেবে যুক্ত করা হয়েছে।


২. Aggregation UDF (User-Defined Function)

Aggregation UDF হলো এমন একটি কাস্টম ফাংশন যা একাধিক মান (multiple values) নিয়ে কাজ করে এবং একটি অ্যাগ্রিগেটিভ মান (যেমন: গড়, যোগফল, সর্বোচ্চ) রিটার্ন করে। এই ধরনের UDFs সাধারণত গ্রুপ বাই (GROUP BY) অপারেশন অথবা অ্যাগ্রিগেটিভ ফাংশনগুলোতে ব্যবহৃত হয়।

Aggregation UDF তৈরি এবং ব্যবহারের উদাহরণ

ধরা যাক, আমাদের একটি DataFrame আছে যেখানে গ্রাহকের নাম এবং তার মাসিক আয়ের তথ্য রয়েছে এবং আমরা একটি কাস্টম অ্যাগ্রিগেটিভ ফাংশন তৈরি করতে চাই যা গ্রাহকের মাসিক আয়ের গড় (average) নির্ধারণ করবে।

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, avg
from pyspark.sql.types import DoubleType

# SparkSession তৈরি
spark = SparkSession.builder \
    .appName("Aggregation UDF Example") \
    .getOrCreate()

# উদাহরণ DataFrame তৈরি
data = [("John", 3000), ("Alice", 4000), ("Bob", 5000), ("John", 3500), ("Alice", 4200)]
columns = ["name", "salary"]
df = spark.createDataFrame(data, columns)

# Aggregation UDF তৈরি করা (গড় নির্ধারণ)
def avg_salary(salaries):
    return sum(salaries) / len(salaries)

# UDF নিবন্ধন করা
avg_salary_udf = udf(avg_salary, DoubleType())

# গ্রুপ বাই (GROUP BY) এবং Aggregation UDF ব্যবহার করা
from pyspark.sql import functions as F
aggregated_df = df.groupBy("name").agg(F.collect_list("salary").alias("salaries"))

# UDF প্রয়োগ করা
result_df = aggregated_df.withColumn("avg_salary", avg_salary_udf(aggregated_df["salaries"]))

result_df.show()

আউটপুট:

+-----+------------+----------+
| name|     salaries|avg_salary|
+-----+------------+----------+
|John | [3000, 3500]|   3250.0 |
|Alice| [4000, 4200]|   4100.0 |
|  Bob|        [5000]|   5000.0 |
+-----+------------+----------+

এখানে, Aggregation UDF avg_salary তৈরি করা হয়েছে যা salaries কলামের একটি লিস্ট নেয় এবং তার গড় (average) হিসাব করে রিটার্ন করে। এই UDFটি groupBy অপারেশনের সাথে ব্যবহৃত হয়েছে এবং গ্রুপভিত্তিক গড় আয়ের মান বের করা হয়েছে।


৩. Performance Optimization for UDFs

UDFs (Scalar এবং Aggregation) ব্যবহারের সময় কিছু পারফরম্যান্স অপটিমাইজেশন করার প্রয়োজন হতে পারে:

  • Columnar Format: UDF ব্যবহার করার আগে ডেটা কে Parquet বা ORC ফরম্যাটে সংরক্ষণ করুন। এগুলি কম্প্রেসড এবং দ্রুত অ্যাক্সেসযোগ্য ফরম্যাট, যা UDF ব্যবহারকে দ্রুত করতে সাহায্য করবে।
  • Avoid using UDFs when built-in functions are available: Spark SQL অনেক built-in functions (যেমন avg, sum, max) প্রদান করে। UDF ব্যবহার করা তখনই উচিত যখন built-in functions আপনার চাহিদা পূরণ করতে না পারে।
  • Use Pandas UDFs (Vectorized UDFs): Spark 3.x থেকে Pandas UDFs বা Vectorized UDFs সমর্থন রয়েছে, যা প্যান্ডাস সিরিজ বা ডেটাফ্রেমে কাজ করে এবং পারফরম্যান্স অনেক উন্নত করতে পারে। এতে একাধিক রেকর্ড একসাথে প্রসেস হয়।

সারাংশ

Scalar UDFs এবং Aggregation UDFs হল Spark SQL-এ কাস্টম লজিক প্রয়োগের শক্তিশালী পদ্ধতি। Scalar UDFs একটি একক মান রিটার্ন করে এবং প্রতিটি সারির উপর কাজ করে, যখন Aggregation UDFs একাধিক রেকর্ডের উপর কাজ করে এবং অ্যাগ্রিগেটিভ মান রিটার্ন করে। Spark SQL-এ এই ধরনের UDFs তৈরি এবং প্রয়োগ করার মাধ্যমে আপনি আরও কাস্টম এবং জটিল লজিক প্রয়োগ করতে পারেন, তবে এর পারফরম্যান্স অপটিমাইজেশন এবং built-in ফাংশন ব্যবহারের ক্ষেত্রে সতর্কতা অবলম্বন করা উচিত।

Content added By
Promotion

Are you sure to start over?

Loading...