Flink SQL ব্যবহার করে স্ট্রিম বা ব্যাচ ডেটা প্রসেসিং-এর জন্য SQL Queries লেখা যায়, যেগুলো ডেটা ফিল্টার, গ্রুপ, অ্যাগ্রিগেট, এবং ট্রান্সফরম করতে পারে। নিচে কিছু সাধারণ উদাহরণসহ Flink SQL queries দেওয়া হলো:
1. সাধারণ নির্বাচন (SELECT) এবং ফিল্টারিং (WHERE)
এই কুয়েরি দিয়ে আপনি নির্দিষ্ট কলাম নির্বাচন করতে পারেন এবং শর্ত অনুযায়ী ফিল্টার করতে পারেন। উদাহরণস্বরূপ, আমরা একটি টেবিল থেকে নির্দিষ্ট টাইপের ইভেন্ট ফিল্টার করতে পারি।
SQL Query:
SELECT user_id, event_type, event_time
FROM events
WHERE event_type = 'login';
- events টেবিল থেকে
user_id,event_type, এবংevent_timeকলামগুলো নির্বাচন করা হয়েছে। - কুয়েরি শুধুমাত্র সেই রেকর্ডগুলো ফেরত দেবে যেখানে
event_typeহলো'login'।
2. GROUP BY এবং Aggregation (COUNT, SUM)
কোনো টেবিলের ডেটাকে গ্রুপ করে অ্যাগ্রিগেশন করা যেতে পারে। নিচের উদাহরণে, আমরা প্রতি user_id ভিত্তিতে ইভেন্টের সংখ্যা গণনা করছি।
SQL Query:
SELECT user_id, COUNT(*) AS event_count
FROM events
GROUP BY user_id;
- এখানে
eventsটেবিল থেকে প্রতিuser_idঅনুযায়ী ইভেন্ট গণনা করা হচ্ছে। COUNT(*)পুরো টেবিলের রেকর্ড সংখ্যা গণনা করে এবং প্রতিটিuser_idএর জন্য এটি ফেরত দেয়।
3. TUMBLE উইন্ডো ব্যবহার করে উইন্ডো অপারেশন
Flink SQL-এ উইন্ডো অপারেশন খুবই গুরুত্বপূর্ণ, বিশেষ করে স্ট্রিম প্রসেসিং-এর জন্য। নিচের উদাহরণে, ৫ মিনিটের টাম্বলিং উইন্ডোতে প্রতিটি event_type এর সংখ্যা গণনা করা হচ্ছে।
SQL Query:
SELECT
TUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start,
TUMBLE_END(event_time, INTERVAL '5' MINUTE) AS window_end,
event_type,
COUNT(*) AS event_count
FROM events
GROUP BY
TUMBLE(event_time, INTERVAL '5' MINUTE),
event_type;
TUMBLE(event_time, INTERVAL '5' MINUTE)একটি ৫ মিনিটের টাম্বলিং উইন্ডো তৈরি করে।TUMBLE_STARTএবংTUMBLE_ENDউইন্ডোর শুরু এবং শেষ সময় ফেরত দেয়।event_typeঅনুযায়ী প্রতিটি উইন্ডোতে ইভেন্টের সংখ্যা গণনা করা হয়েছে।
4. HAVING Clause ব্যবহার করে Aggregation ফিল্টার করা
Flink SQL-এ HAVING clause ব্যবহার করে, আপনি গ্রুপ করা ডেটাতে শর্ত প্রয়োগ করতে পারেন। নিচে একটি উদাহরণ দেয়া হলো, যেখানে প্রতি user_id এর জন্য ইভেন্টের সংখ্যা ১০ এর বেশি হলে সেই রেকর্ডগুলো ফেরত দেয়া হয়েছে।
SQL Query:
SELECT user_id, COUNT(*) AS event_count
FROM events
GROUP BY user_id
HAVING COUNT(*) > 10;
GROUP BYব্যবহার করে প্রতিটিuser_idঅনুযায়ী ইভেন্ট গুনে বের করা হয়েছে।HAVING COUNT(*) > 10এর মাধ্যমে শুধু সেইuser_idফেরত দেয়া হচ্ছে যাদের ইভেন্ট সংখ্যা ১০ এর বেশি।
5. SLIDING উইন্ডো ব্যবহার করে উইন্ডো অপারেশন
Sliding উইন্ডো দিয়ে নির্দিষ্ট সময়ের জন্য উইন্ডো তৈরি করা যায় যা নির্দিষ্ট সময় পর পর স্লাইড করে। নিচে একটি উদাহরণ দেয়া হলো, যেখানে ১০ মিনিটের উইন্ডো এবং ৫ মিনিটের স্লাইড ব্যবহার করা হয়েছে।
SQL Query:
SELECT
HOP_START(event_time, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE) AS window_start,
HOP_END(event_time, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE) AS window_end,
event_type,
COUNT(*) AS event_count
FROM events
GROUP BY
HOP(event_time, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE),
event_type;
HOP(event_time, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE)একটি ১০ মিনিটের উইন্ডো তৈরি করে, যা প্রতি ৫ মিনিট পর পর স্লাইড করে।HOP_STARTএবংHOP_ENDউইন্ডোর শুরুর এবং শেষের সময় দেখায়।
6. JOIN ব্যবহার করে টেবিল মেলানো
Flink SQL-এ আপনি বিভিন্ন টেবিলের মধ্যে JOIN ব্যবহার করে ডেটা মিলিয়ে দেখতে পারেন। নিচে একটি উদাহরণ দেয়া হলো, যেখানে orders এবং customers টেবিল যোগ করা হয়েছে।
SQL Query:
SELECT o.order_id, o.order_date, c.customer_name
FROM orders o
JOIN customers c
ON o.customer_id = c.customer_id;
- এখানে
ordersএবংcustomersটেবিলের মধ্যেcustomer_idকলামের উপর ভিত্তি করে যোগ করা হয়েছে। - এটি প্রতিটি অর্ডারের জন্য গ্রাহকের নাম এবং অর্ডার আইডি ফেরত দেয়।
7. পানি চিহ্ন (WATERMARK) ব্যবহার করে ইভেন্ট টাইম প্রসেসিং
Flink SQL-এ WATERMARK ব্যবহার করে ইভেন্ট টাইমের উপর ভিত্তি করে প্রসেসিং করা যায়, যা লেট ইভেন্টগুলো হ্যান্ডেল করতে সাহায্য করে। নিচে একটি টেবিলের উদাহরণ দেয়া হলো যেখানে event_time এর উপর ভিত্তি করে watermark তৈরি করা হয়েছে।
SQL Query:
CREATE TABLE events (
user_id STRING,
event_type STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '2' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'event-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
);
- এই টেবিল
Kafkaথেকে ডেটা পড়ে এবংevent_timeকলামের উপর ভিত্তি করে watermark তৈরি করে। WATERMARK FOR event_time AS event_time - INTERVAL '2' SECONDএর মাধ্যমে ইভেন্ট টাইম থেকে ২ সেকেন্ড পিছিয়ে watermark সেট করা হয়েছে।
উপসংহার
Flink SQL দিয়ে আপনি বিভিন্ন ধরনের query চালাতে পারেন, যা স্ট্রিম এবং ব্যাচ ডেটা প্রসেসিং-এর জন্য খুবই কার্যকর। এর মাধ্যমে ডেটা ফিল্টার, গ্রুপ, অ্যাগ্রিগেট এবং উইন্ডো প্রসেসিং সহজে করা যায়। Flink SQL-এ ডেটাবেজের মতো টেবিল তৈরি করা, সেগুলোর মধ্যে সম্পর্ক তৈরি করা এবং কাস্টম উইন্ডো এবং ইভেন্ট টাইম প্রসেসিং করার মাধ্যমে ডেটা বিশ্লেষণকে সহজ এবং কার্যকর করা যায়।
Read more