Custom Functions এবং Filters যোগ করা

Storm এর জন্য Custom Spouts এবং Bolts - অ্যাপাচি স্টর্ম (Apache Storm) - Big Data and Analytics

409

Apache Storm একটি রিয়েল-টাইম ডিসট্রিবিউটেড ডেটা প্রসেসিং সিস্টেম, যা ডেটা স্ট্রিম প্রক্রিয়া করতে ব্যবহৃত হয়। Storm টপোলজির মধ্যে ডেটার ট্রান্সফরমেশন, ফিল্টারিং, এবং অন্যান্য কাস্টম কার্যক্রম যুক্ত করতে Custom Functions এবং Filters ব্যবহার করা যেতে পারে। যখন ডেটার উপর নির্দিষ্ট ট্রান্সফরমেশন বা ফিল্টারিং কার্যকলাপ প্রয়োজন হয়, তখন Storm-এর Bolt বা Spout-এ কাস্টম ফাংশন এবং ফিল্টার যোগ করা খুবই কার্যকরী।

এই টিউটোরিয়ালে, আমরা আলোচনা করব কিভাবে Custom Functions এবং Filters Storm টপোলজিতে যোগ করা যায় এবং সেগুলি কিভাবে ব্যবহার করা যায়।


১. Custom Functions যোগ করা

Custom Functions Storm টপোলজির মধ্যে ডেটার বিশেষ কার্যক্রম বা ট্রান্সফরমেশন প্রয়োগ করতে ব্যবহৃত হয়। আপনি Storm-এ Bolt ব্যবহার করে ডেটার ওপর কাস্টম ফাংশন প্রয়োগ করতে পারেন। কাস্টম ফাংশনগুলি সাধারন ট্রান্সফরমেশন যেমন ডেটার স্কেলিং, মান যাচাই, অ্যাগ্রিগেশন ইত্যাদি পরিচালনা করতে ব্যবহৃত হতে পারে।

১.১ Custom Function উদাহরণ

ধরা যাক, আপনি ডেটার একটি নির্দিষ্ট ফিল্ডে মান স্কেল করতে চান, যেমন একজন ব্যবহারকারীর ইনকাম (income) স্কেল করা। এর জন্য আপনি একটি কাস্টম ফাংশন তৈরি করতে পারেন।

public class CustomFunction implements Function {
    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String name = tuple.getStringByField("name");
        Integer income = tuple.getIntegerByField("income");

        // Custom transformation (scale income)
        Integer scaledIncome = income * 2;

        // Emit transformed result
        collector.emit(new Values(name, scaledIncome));
    }
}

এখানে, CustomFunction একটি কাস্টম ফাংশন যা income ফিল্ডের মান স্কেল করে এবং পরবর্তী প্রক্রিয়ার জন্য তা আউটপুট করে। এই কাস্টম ফাংশনটি Bolt এর মধ্যে ব্যবহৃত হতে পারে।

১.২ Custom Function-কে Bolt এর মধ্যে ব্যবহার করা

public class MyBolt extends BaseBasicBolt {
    private CustomFunction customFunction;

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        // Initialize the custom function
        customFunction = new CustomFunction();
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        // Apply custom function to the tuple
        customFunction.execute(tuple, collector);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("name", "scaledIncome"));
    }
}

এখানে, MyBolt বোল্টের মধ্যে CustomFunction ব্যবহার করা হয়েছে, যা ডেটার মান স্কেল করে এবং পরবর্তী প্রক্রিয়ার জন্য ফলাফল প্রস্তুত করে।


২. Filters যোগ করা

Filters Storm-এ ডেটার একটি অংশ বাদ দেওয়ার জন্য বা শর্তাধীনভাবে ডেটা গ্রহণ করার জন্য ব্যবহৃত হয়। আপনি Bolt এর মধ্যে ফিল্টার যুক্ত করতে পারেন, যাতে নির্দিষ্ট শর্ত পূর্ণ হলে ডেটা প্রক্রিয়া করা হয় এবং অন্যথায় বাদ দেওয়া হয়।

২.১ Filter Function উদাহরণ

ধরা যাক, আপনি ডেটা ফিল্টার করতে চান যেখানে income ১০,০০০-এর বেশি হতে হবে। এমন একটি filter ফাংশন তৈরি করা যেতে পারে:

public class IncomeFilter implements Filter {
    @Override
    public boolean isValid(Tuple tuple) {
        Integer income = tuple.getIntegerByField("income");
        return income > 10000;  // Only allow income greater than 10000
    }
}

এখানে, IncomeFilter ফাংশন একটি শর্ত প্রদান করছে, যা income ১০,০০০ এর বেশি হলে তবেই ডেটা গ্রহণ করবে, অন্যথায় ফিল্টার করবে।

২.২ Filter-কে Bolt এর মধ্যে ব্যবহার করা

public class MyFilteredBolt extends BaseBasicBolt {
    private IncomeFilter incomeFilter;

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        // Initialize the filter
        incomeFilter = new IncomeFilter();
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        // Apply filter to the tuple
        if (incomeFilter.isValid(tuple)) {
            // If valid, process and emit
            String name = tuple.getStringByField("name");
            Integer income = tuple.getIntegerByField("income");
            collector.emit(new Values(name, income));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("name", "income"));
    }
}

এখানে, MyFilteredBolt বোল্টের মধ্যে IncomeFilter ব্যবহার করা হয়েছে। এটি ডেটা ফিল্টার করে, এবং শুধু শর্ত পূর্ণ হলে ডেটা প্রক্রিয়া এবং আউটপুট করবে।


৩. Custom Functions এবং Filters এর সেরা অভ্যাস

Storm টপোলজিতে Custom Functions এবং Filters ব্যবহার করার সময় কিছু best practices অনুসরণ করা উচিত:

৩.১ Reuse Custom Functions

কাস্টম ফাংশনগুলি পুনঃব্যবহারযোগ্য হওয়া উচিত। একাধিক টাস্ক বা বোল্টে একই কাস্টম ফাংশন ব্যবহার করতে হলে, ফাংশনগুলিকে ভিন্ন ভিন্ন অংশে বিভক্ত করুন এবং সেগুলিকে স্টেটলেস রাখুন।

৩.২ Efficient Filters

ফিল্টার ব্যবহারের সময় শর্তগুলি দ্রুত এবং কার্যকরী হওয়া উচিত। ফিল্টার ফাংশনগুলি সিম্পল এবং কমপ্লেক্স না রাখাই ভালো। যদি ডেটার উপর অনেক গুলি শর্তের ভিত্তিতে ফিল্টার প্রয়োজন হয়, তবে সেই ফিল্টারগুলি একসাথে যোগ করা যেতে পারে।

৩.৩ Error Handling

কাস্টম ফাংশন এবং ফিল্টার ব্যবহারের সময় সম্ভাব্য ত্রুটিগুলি হ্যান্ডেল করার জন্য যথাযথ exception handling ব্যবস্থাপনা থাকতে হবে। এটি সিস্টেমের স্থায়িত্ব এবং সঠিকতা নিশ্চিত করবে।

৩.৪ Maintainable Code

কাস্টম ফাংশন এবং ফিল্টারগুলি যতটা সম্ভব পরিষ্কার এবং রিডেবল হওয়া উচিত, যাতে ভবিষ্যতে যখন পরিবর্তন প্রয়োজন হবে, তখন কোডটি সহজে বজায় রাখা যায়।


সারাংশ

Apache StormCustom Functions এবং Filters যোগ করে আপনি ডেটা স্ট্রিম প্রক্রিয়াকরণ এবং ট্রান্সফরমেশন কার্যক্রম কাস্টমাইজ করতে পারেন। Custom Functions Storm-এর Bolt এ ডেটার বিশেষ ট্রান্সফরমেশন করার জন্য ব্যবহার করা হয়, এবং Filters ডেটা ফিল্টার করতে বা নির্দিষ্ট শর্ত পূর্ণ হলে ডেটা গ্রহণ করতে ব্যবহৃত হয়। Storm-এ এই কাস্টম ফাংশন এবং ফিল্টার ব্যবহার করে ডেটা প্রক্রিয়াকরণ আরও ফ্লেক্সিবল এবং কার্যকরী করা যায়।

Content added By
Promotion

Are you sure to start over?

Loading...