Big Data and Analytics RDD এবং DataFrame এর জন্য Transformations গাইড ও নোট

401

Apache Spark একটি শক্তিশালী ডিস্ট্রিবিউটেড ডেটা প্রসেসিং ফ্রেমওয়ার্ক যা RDD (Resilient Distributed Dataset) এবং DataFrame ব্যবহার করে ডেটা ট্রান্সফরমেশন এবং অ্যানালাইসিস সহজ করে তোলে। এই দুটি ডেটা স্ট্রাকচার স্পার্কের সবচেয়ে গুরুত্বপূর্ণ উপাদান। স্পার্কে Transformations হল সেই ফাংশন যা ডেটার মধ্যে পরিবর্তন এনে নতুন ডেটা স্ট্রাকচার তৈরি করে, কিন্তু এগুলি lazy evaluation এর উপর কাজ করে, অর্থাৎ ট্রান্সফরমেশনগুলো তখনই কার্যকরী হয় যখন অ্যাকশন ফাংশন (যেমন collect(), count()) ব্যবহার করা হয়।

এই টিউটোরিয়ালে, আমরা RDD এবং DataFrame এর জন্য কিছু সাধারণ Transformations ফাংশন নিয়ে আলোচনা করব।


RDD Transformations

RDD Transformations হল সেই অপারেশন যা RDD এর উপরে নতুন RDD তৈরি করতে ব্যবহৃত হয়। RDD ট্রান্সফরমেশনগুলি immutable (অপরিবর্তনীয়) এবং lazy হয়, অর্থাৎ শুধুমাত্র যখন আপনি একটি action (যেমন collect(), count()) চালান তখনই ট্রান্সফরমেশন কার্যকরী হয়।

Common RDD Transformations

  1. map()

map() ট্রান্সফরমেশনটি RDD এর প্রতিটি উপাদানের উপর একটি ফাংশন প্রয়োগ করে এবং একটি নতুন RDD তৈরি করে। এটি একটি এলিমেন্ট থেকে অন্য এলিমেন্টে ম্যাপিং করে।

Example:
from pyspark import SparkContext

sc = SparkContext("local", "Map Example")
rdd = sc.parallelize([1, 2, 3, 4, 5])

# map() ফাংশন ব্যবহার করে প্রতিটি উপাদানে 2 গুণ করা হচ্ছে
result = rdd.map(lambda x: x * 2)

print(result.collect())  # Output: [2, 4, 6, 8, 10]

এখানে:

  • map() ফাংশনটি প্রতিটি উপাদানকে 2 গুণ করেছে এবং একটি নতুন RDD তৈরি করেছে।

  1. filter()

filter() ট্রান্সফরমেশনটি RDD থেকে এমন উপাদানগুলো বেছে নেয় যা একটি নির্দিষ্ট শর্ত পূর্ণ করে। এটি একটি নতুন RDD তৈরি করে যা কেবলমাত্র শর্ত পূর্ণ করা উপাদানগুলিকে ধারণ করে।

Example:
rdd = sc.parallelize([1, 2, 3, 4, 5])

# filter() ফাংশন ব্যবহার করে এমন উপাদানগুলো বেছে নেওয়া হচ্ছে যা 3 এর বেশি
result = rdd.filter(lambda x: x > 3)

print(result.collect())  # Output: [4, 5]

এখানে:

  • filter() ফাংশনটি 3 এর চেয়ে বড় উপাদানগুলো নির্বাচন করেছে।

  1. flatMap()

flatMap() একটি শক্তিশালী ট্রান্সফরমেশন, যা একটি এলিমেন্ট থেকে একাধিক এলিমেন্ট উৎপন্ন করতে ব্যবহৃত হয়। এটি flatMap ফাংশনের মাধ্যমে একটি লিস্ট বা অন্য কোনো ডেটা স্ট্রাকচার তৈরি করে, যা সাধারণত flat করা হয়।

Example:
rdd = sc.parallelize(["hello world", "apache spark"])

# flatMap() ফাংশন ব্যবহার করে শব্দগুলিকে পৃথক করা হচ্ছে
result = rdd.flatMap(lambda x: x.split())

print(result.collect())  # Output: ['hello', 'world', 'apache', 'spark']

এখানে:

  • flatMap() ফাংশনটি একটি লিস্টের মধ্যে থাকা শব্দগুলো পৃথক করেছে এবং একটি flat লিস্ট তৈরি করেছে।

  1. union()

union() ট্রান্সফরমেশনটি দুটি RDD এর একত্রিত করে একটি নতুন RDD তৈরি করে। এটি একাধিক RDD এর উপাদানগুলো একত্রে জমা করে।

Example:
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([4, 5, 6])

# union() ফাংশন দুটি RDD একত্রিত করতে ব্যবহৃত হচ্ছে
result = rdd1.union(rdd2)

print(result.collect())  # Output: [1, 2, 3, 4, 5, 6]

এখানে:

  • union() ফাংশনটি দুটি RDD এর উপাদানগুলো একত্রিত করেছে।

  1. distinct()

distinct() ট্রান্সফরমেশনটি একটি RDD থেকে ডুপ্লিকেট উপাদানগুলো সরিয়ে দিয়ে একমাত্র ইউনিক (unique) উপাদানগুলো রেখে নতুন RDD তৈরি করে।

Example:
rdd = sc.parallelize([1, 1, 2, 3, 3, 4])

# distinct() ফাংশনটি ডুপ্লিকেট উপাদানগুলো সরিয়ে ফেলবে
result = rdd.distinct()

print(result.collect())  # Output: [1, 2, 3, 4]

এখানে:

  • distinct() ফাংশনটি সমস্ত ডুপ্লিকেট মান সরিয়ে দিয়ে একটি নতুন RDD তৈরি করেছে।

DataFrame Transformations

DataFrame হল স্পার্কের আরো উন্নত ডেটা স্ট্রাকচার, যা RDD এর উপর ভিত্তি করে তৈরি, কিন্তু এটি আরও বেশি স্ট্রাকচারড এবং SQL-এর মতো কুয়েরি ব্যবস্থাপনা প্রদান করে। DataFrame এর ট্রান্সফরমেশনগুলি lazy হয় এবং ডেটার উপর বিভিন্ন কার্যকরী অপারেশন সম্পাদন করে।

Common DataFrame Transformations

  1. select()

select() ফাংশনটি DataFrame থেকে নির্দিষ্ট কলাম নির্বাচন করতে ব্যবহৃত হয়। এটি একটি নতুন DataFrame তৈরি করে যা কেবল নির্বাচিত কলামগুলো ধারণ করে।

Example:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("Select Example").getOrCreate()

# DataFrame তৈরি করা হচ্ছে
data = [("Alice", 25), ("Bob", 30), ("Cathy", 28)]
df = spark.createDataFrame(data, ["Name", "Age"])

# select() ফাংশন ব্যবহার করে নির্দিষ্ট কলাম নির্বাচন করা হচ্ছে
result = df.select("Name")

result.show()

Output:

+-----+
| Name|
+-----+
|Alice|
|  Bob|
|Cathy|
+-----+

এখানে:

  • select() ফাংশনটি Name কলামটি নির্বাচন করেছে।

  1. filter()

filter() DataFrame থেকে একটি শর্ত অনুযায়ী রেকর্ড নির্বাচন করতে ব্যবহৃত হয়। এটি SQL-এর WHERE ক্লজের মতো কাজ করে।

Example:
df = spark.createDataFrame(data, ["Name", "Age"])

# filter() ফাংশন ব্যবহার করে Age > 28 এমন রেকর্ড বাছাই করা হচ্ছে
result = df.filter(df.Age > 28)

result.show()

Output:

+----+---+
|Name|Age|
+----+---+
| Bob| 30|
+----+---+

এখানে:

  • filter() ফাংশনটি Age > 28 এমন রেকর্ড বেছে নিয়েছে।

  1. groupBy()

groupBy() ফাংশনটি একটি বা একাধিক কলামকে গ্রুপিং করতে ব্যবহৃত হয়। এর মাধ্যমে ডেটাকে গ্রুপ করে পরবর্তী সময়ে অ্যাগ্রিগেশন (যেমন, গড়, সমষ্টি, সর্বাধিক) করা যায়।

Example:
df = spark.createDataFrame([("Alice", 25), ("Bob", 30), ("Alice", 28)], ["Name", "Age"])

# groupBy() ফাংশন ব্যবহার করে Name অনুসারে গ্রুপ করা হচ্ছে
result = df.groupBy("Name").avg("Age")

result.show()

Output:

+-----+--------+
| Name|avg(Age)|
+-----+--------+
| Alice|    26.5|
|  Bob|    30.0|
+-----+--------+

এখানে:

  • groupBy() ফাংশনটি Name অনুসারে ডেটা গ্রুপ করেছে এবং প্রতিটি গ্রুপের গড় বয়স হিসাব করেছে।

  1. withColumn()

withColumn() ফাংশনটি DataFrame এ নতুন কলাম যোগ করার জন্য ব্যবহৃত হয় বা পুরনো কলামটি আপডেট করার জন্য।

Example:
df = spark.createDataFrame([("Alice", 25), ("Bob", 30)], ["Name", "Age"])

# withColumn() ফাংশন ব্যবহার করে Age এর উপর ভিত্তি করে একটি নতুন কলাম যোগ করা হচ্ছে
result = df.withColumn("AgePlusOne", df.Age + 1)

result.show()

Output:

+-----+---+---------+
| Name|Age|AgePlusOne|
+-----+---+---------+
|Alice| 25|       26|
|  Bob| 30|       31|
+-----+---+---------+

এখানে:

  • withColumn() ফাংশনটি একটি নতুন কলাম AgePlusOne যোগ করেছে।

Conclusion

RDD এবং DataFrame এর জন্য Transformations হল স্পার্কের শক্তিশালী বৈশিষ্ট্য, যা ডেটার উপর বিভিন্ন অপারেশন সম্পাদন করতে সহায়ক। RDD Transformations যেমন map(), filter(), flatMap() এবং DataFrame Transformations যেমন select(), filter(), groupBy() স্পার্কের ডিস্ট্রিবিউটেড প্রসেসিং ফিচারগুলির জন্য খুবই গুরুত্বপূর্ণ। এই ট্রান্সফরমেশনগুলো ব্যবহার করে আপনি ডেটাকে সহজভাবে প্রসেস করতে পারেন এবং বিভিন্ন প্রয়োজনে নতুন ডেটা স্ট্রাকচার তৈরি করতে পারেন।

Content added By
Promotion

Are you sure to start over?

Loading...