Flink SQL ব্যবহার করে স্ট্রিম বা ব্যাচ ডেটা প্রসেসিং-এর জন্য SQL Queries লেখা যায়, যেগুলো ডেটা ফিল্টার, গ্রুপ, অ্যাগ্রিগেট, এবং ট্রান্সফরম করতে পারে। নিচে কিছু সাধারণ উদাহরণসহ Flink SQL queries দেওয়া হলো:

1. সাধারণ নির্বাচন (SELECT) এবং ফিল্টারিং (WHERE)

এই কুয়েরি দিয়ে আপনি নির্দিষ্ট কলাম নির্বাচন করতে পারেন এবং শর্ত অনুযায়ী ফিল্টার করতে পারেন। উদাহরণস্বরূপ, আমরা একটি টেবিল থেকে নির্দিষ্ট টাইপের ইভেন্ট ফিল্টার করতে পারি।

SQL Query:

SELECT user_id, event_type, event_time
FROM events
WHERE event_type = 'login';
  • events টেবিল থেকে user_id, event_type, এবং event_time কলামগুলো নির্বাচন করা হয়েছে।
  • কুয়েরি শুধুমাত্র সেই রেকর্ডগুলো ফেরত দেবে যেখানে event_type হলো 'login'

2. GROUP BY এবং Aggregation (COUNT, SUM)

কোনো টেবিলের ডেটাকে গ্রুপ করে অ্যাগ্রিগেশন করা যেতে পারে। নিচের উদাহরণে, আমরা প্রতি user_id ভিত্তিতে ইভেন্টের সংখ্যা গণনা করছি।

SQL Query:

SELECT user_id, COUNT(*) AS event_count
FROM events
GROUP BY user_id;
  • এখানে events টেবিল থেকে প্রতি user_id অনুযায়ী ইভেন্ট গণনা করা হচ্ছে।
  • COUNT(*) পুরো টেবিলের রেকর্ড সংখ্যা গণনা করে এবং প্রতিটি user_id এর জন্য এটি ফেরত দেয়।

3. TUMBLE উইন্ডো ব্যবহার করে উইন্ডো অপারেশন

Flink SQL-এ উইন্ডো অপারেশন খুবই গুরুত্বপূর্ণ, বিশেষ করে স্ট্রিম প্রসেসিং-এর জন্য। নিচের উদাহরণে, ৫ মিনিটের টাম্বলিং উইন্ডোতে প্রতিটি event_type এর সংখ্যা গণনা করা হচ্ছে।

SQL Query:

SELECT
    TUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start,
    TUMBLE_END(event_time, INTERVAL '5' MINUTE) AS window_end,
    event_type,
    COUNT(*) AS event_count
FROM events
GROUP BY
    TUMBLE(event_time, INTERVAL '5' MINUTE),
    event_type;
  • TUMBLE(event_time, INTERVAL '5' MINUTE) একটি ৫ মিনিটের টাম্বলিং উইন্ডো তৈরি করে।
  • TUMBLE_START এবং TUMBLE_END উইন্ডোর শুরু এবং শেষ সময় ফেরত দেয়।
  • event_type অনুযায়ী প্রতিটি উইন্ডোতে ইভেন্টের সংখ্যা গণনা করা হয়েছে।

4. HAVING Clause ব্যবহার করে Aggregation ফিল্টার করা

Flink SQL-এ HAVING clause ব্যবহার করে, আপনি গ্রুপ করা ডেটাতে শর্ত প্রয়োগ করতে পারেন। নিচে একটি উদাহরণ দেয়া হলো, যেখানে প্রতি user_id এর জন্য ইভেন্টের সংখ্যা ১০ এর বেশি হলে সেই রেকর্ডগুলো ফেরত দেয়া হয়েছে।

SQL Query:

SELECT user_id, COUNT(*) AS event_count
FROM events
GROUP BY user_id
HAVING COUNT(*) > 10;
  • GROUP BY ব্যবহার করে প্রতিটি user_id অনুযায়ী ইভেন্ট গুনে বের করা হয়েছে।
  • HAVING COUNT(*) > 10 এর মাধ্যমে শুধু সেই user_id ফেরত দেয়া হচ্ছে যাদের ইভেন্ট সংখ্যা ১০ এর বেশি।

5. SLIDING উইন্ডো ব্যবহার করে উইন্ডো অপারেশন

Sliding উইন্ডো দিয়ে নির্দিষ্ট সময়ের জন্য উইন্ডো তৈরি করা যায় যা নির্দিষ্ট সময় পর পর স্লাইড করে। নিচে একটি উদাহরণ দেয়া হলো, যেখানে ১০ মিনিটের উইন্ডো এবং ৫ মিনিটের স্লাইড ব্যবহার করা হয়েছে।

SQL Query:

SELECT
    HOP_START(event_time, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE) AS window_start,
    HOP_END(event_time, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE) AS window_end,
    event_type,
    COUNT(*) AS event_count
FROM events
GROUP BY
    HOP(event_time, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE),
    event_type;
  • HOP(event_time, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE) একটি ১০ মিনিটের উইন্ডো তৈরি করে, যা প্রতি ৫ মিনিট পর পর স্লাইড করে।
  • HOP_START এবং HOP_END উইন্ডোর শুরুর এবং শেষের সময় দেখায়।

6. JOIN ব্যবহার করে টেবিল মেলানো

Flink SQL-এ আপনি বিভিন্ন টেবিলের মধ্যে JOIN ব্যবহার করে ডেটা মিলিয়ে দেখতে পারেন। নিচে একটি উদাহরণ দেয়া হলো, যেখানে orders এবং customers টেবিল যোগ করা হয়েছে।

SQL Query:

SELECT o.order_id, o.order_date, c.customer_name
FROM orders o
JOIN customers c
ON o.customer_id = c.customer_id;
  • এখানে orders এবং customers টেবিলের মধ্যে customer_id কলামের উপর ভিত্তি করে যোগ করা হয়েছে।
  • এটি প্রতিটি অর্ডারের জন্য গ্রাহকের নাম এবং অর্ডার আইডি ফেরত দেয়।

7. পানি চিহ্ন (WATERMARK) ব্যবহার করে ইভেন্ট টাইম প্রসেসিং

Flink SQL-এ WATERMARK ব্যবহার করে ইভেন্ট টাইমের উপর ভিত্তি করে প্রসেসিং করা যায়, যা লেট ইভেন্টগুলো হ্যান্ডেল করতে সাহায্য করে। নিচে একটি টেবিলের উদাহরণ দেয়া হলো যেখানে event_time এর উপর ভিত্তি করে watermark তৈরি করা হয়েছে।

SQL Query:

CREATE TABLE events (
    user_id STRING,
    event_type STRING,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '2' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'event-topic',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json',
    'scan.startup.mode' = 'earliest-offset'
);
  • এই টেবিল Kafka থেকে ডেটা পড়ে এবং event_time কলামের উপর ভিত্তি করে watermark তৈরি করে।
  • WATERMARK FOR event_time AS event_time - INTERVAL '2' SECOND এর মাধ্যমে ইভেন্ট টাইম থেকে ২ সেকেন্ড পিছিয়ে watermark সেট করা হয়েছে।

উপসংহার

Flink SQL দিয়ে আপনি বিভিন্ন ধরনের query চালাতে পারেন, যা স্ট্রিম এবং ব্যাচ ডেটা প্রসেসিং-এর জন্য খুবই কার্যকর। এর মাধ্যমে ডেটা ফিল্টার, গ্রুপ, অ্যাগ্রিগেট এবং উইন্ডো প্রসেসিং সহজে করা যায়। Flink SQL-এ ডেটাবেজের মতো টেবিল তৈরি করা, সেগুলোর মধ্যে সম্পর্ক তৈরি করা এবং কাস্টম উইন্ডো এবং ইভেন্ট টাইম প্রসেসিং করার মাধ্যমে ডেটা বিশ্লেষণকে সহজ এবং কার্যকর করা যায়।

আরও দেখুন...

Promotion