Custom Spouts তৈরি করা

Spouts এবং Bolts এর ধারণা - অ্যাপাচি স্টর্ম (Apache Storm) - Big Data and Analytics

369

Spouts হলো Apache Storm টপোলজির একটি গুরুত্বপূর্ণ অংশ, যেগুলি ডেটা সংগ্রহ করে এবং Storm সিস্টেমে প্রবাহিত করে। Spouts সাধারণত ডেটার উৎস থেকে ডেটা নিয়ে আসে, যেমন মেসেজ কিউ, ডেটাবেস, বা ফাইল সিস্টেম। কখনও কখনও, আপনি একটি Custom Spout তৈরি করতে পারেন যেটি আপনার নির্দিষ্ট ডেটা উৎস বা প্রক্রিয়াকরণের জন্য উপযুক্ত।

এই গাইডে আমরা শিখব কিভাবে Custom Spout তৈরি করতে হয় এবং কীভাবে Storm টপোলজিতে সেটি ব্যবহার করা যায়।


Custom Spout কী?

Custom Spout হল একটি কাস্টম ক্লাস যা Storm-এ ডেটা প্রবাহিত করার জন্য তৈরি করা হয়। এটি একটি ডেটা উৎস (যেমন Kafka, RabbitMQ, ফাইল সিস্টেম ইত্যাদি) থেকে ডেটা সংগ্রহ করে এবং emit() মেথড ব্যবহার করে Storm টপোলজিতে পাঠায়। Storm এ Spout সাধারণত IRichSpout ইন্টারফেস অথবা BaseRichSpout ক্লাসের মাধ্যমে তৈরি করা হয়।

Spout তৈরি করতে কি কি প্রয়োজন?

  1. Spout ক্লাস তৈরি করুন: আপনার কাস্টম স্পাউট ক্লাস তৈরি করুন যা Storm এর IRichSpout ইন্টারফেস ইমপ্লিমেন্ট করবে অথবা BaseRichSpout ক্লাস এক্সটেন্ড করবে।
  2. ডেটা উৎস থেকে ডেটা সংগ্রহ: Spout টাস্কের কাজ হলো ডেটা সংগ্রহ করা এবং সেটি Storm টপোলজিতে পাঠানো।
  3. emit() মেথড ব্যবহার করুন: Spout এর ডেটা প্রসেসিং বা স্ট্রিমের মাধ্যমে ইমিট করতে emit() মেথড ব্যবহার করুন।

Custom Spout তৈরি করার ধাপ

এখানে আমরা একটি Custom Spout তৈরি করার প্রক্রিয়া এবং কোড উদাহরণ দেখব।

১. BaseRichSpout ক্লাস তৈরি করা

Storm এর BaseRichSpout ক্লাসটিকে এক্সটেন্ড করে Custom Spout তৈরি করা হয়। BaseRichSpout ব্যবহার করলে আপনি বেশ কিছু ডিফল্ট কার্যক্রম সহজে পরিচালনা করতে পারেন, যেমন open(), nextTuple(), এবং close() মেথডগুলি।

২. Custom Spout কোড উদাহরণ

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.Map;

public class MyCustomSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private int counter;

    @Override
    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        this.counter = 0;  // ডেটা উৎপন্ন করতে কাউন্টার ব্যবহার
    }

    @Override
    public void nextTuple() {
        // প্রতি সময়ে একটি নতুন টুপল ইমিট করা
        if (counter < 5) {  // 5টি আইটেম উৎপন্ন করার জন্য
            collector.emit(new Values("Message " + counter));  // একটি টুপল তৈরি করে ইমিট করা
            counter++;
        } else {
            try {
                Thread.sleep(1000);  // যদি ডেটা না থাকে, তবে এক সেকেন্ড অপেক্ষা
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public void close() {
        // Spout বন্ধ করার সময় প্রয়োজনীয় ক্লিন-আপ কোড লিখতে পারেন।
    }

    @Override
    public void declareOutputFields(Fields declarer) {
        // Spout থেকে যে ফিল্ডগুলি আউটপুট হিসেবে প্রদান হবে, তা ঘোষণা করা
        declarer.declare(new Fields("message"));
    }
}

এখানে আমরা একটি সাধারণ MyCustomSpout তৈরি করেছি যা প্রতি বার একটি নতুন বার্তা (যেমন "Message 0", "Message 1", ইত্যাদি) উৎপন্ন করবে এবং সেটি Storm টপোলজিতে পাঠাবে।

  • open(): Spout এর ইনিশিয়ালাইজেশন কাজ সম্পন্ন করে এবং এটি SpoutOutputCollector প্রাপ্ত করে, যা টুপলগুলিকে emit() করে পাঠাতে সাহায্য করে।
  • nextTuple(): এখানে ডেটা উৎপন্ন হয় এবং emit() মেথডের মাধ্যমে Storm টপোলজিতে পাঠানো হয়।
  • close(): Spout বন্ধ হওয়ার সময় কিছু ক্লিন-আপ কাজ করা যায়।
  • declareOutputFields(): Spout থেকে আউটপুট হিসেবে যেসব ফিল্ড পাঠানো হবে তা ডিফাইন করা হয়।

৩. Custom Spout টপোলজিতে যুক্ত করা

Custom Spout তৈরি করার পর, আপনাকে এটি Storm টপোলজিতে যুক্ত করতে হবে। নিচে একটি টপোলজি কোড উদাহরণ দেওয়া হলো যেখানে আমরা আমাদের কাস্টম স্পাউট ব্যবহার করছি।

import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.LocalCluster;

public class MyStormTopology {
    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();

        // কাস্টম স্পাউট টপোলজিতে যুক্ত করা
        builder.setSpout("myCustomSpout", new MyCustomSpout(), 1);

        // একটি বোল্ট যুক্ত করা
        builder.setBolt("printBolt", new PrintBolt(), 1).shuffleGrouping("myCustomSpout");

        // টপোলজি তৈরি
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("myTopolgy", new Config(), builder.createTopology());

        // টপোলজি কিছুক্ষণ চলার পর ক্লাস্টার বন্ধ করা
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        cluster.shutdown();
    }
}

এখানে আমরা MyCustomSpout কে setSpout() মেথডের মাধ্যমে টপোলজিতে যুক্ত করেছি এবং একটি PrintBolt বোল্ট ব্যবহার করেছি, যা Spout থেকে প্রাপ্ত টুপল প্রিন্ট করবে।


৪. Custom Spout এর বোল্ট তৈরি করা

একটি সাধারণ PrintBolt বোল্ট যা Spout থেকে প্রাপ্ত টুপল প্রিন্ট করবে:

import org.apache.storm.task.OutputCollector;
import org.apache.storm.topology.BasicBolt;
import org.apache.storm.tuple.Tuple;

public class PrintBolt extends BasicBolt {
    @Override
    public void execute(Tuple input) {
        // Spout থেকে আসা মেসেজ প্রিন্ট করা
        String message = input.getStringByField("message");
        System.out.println("Received: " + message);
    }

    @Override
    public void declareOutputFields(Fields declarer) {
        // এই বোল্ট কোন আউটপুট দিবে না
    }
}

এটি কাস্টম স্পাউটের আউটপুট টুপলগুলো প্রিন্ট করবে।


সারাংশ

Custom Spout তৈরি করার মাধ্যমে আপনি Storm টপোলজিতে আপনার প্রয়োজনীয় ডেটা উৎস বা লজিক ইনজেক্ট করতে পারেন। BaseRichSpout বা IRichSpout ইন্টারফেস ব্যবহার করে Spout তৈরি করা হয়, যা Storm টপোলজির মধ্যে ডেটা প্রবাহিত করার জন্য কার্যকরী। Spout এর মাধ্যমে ডেটা সংগ্রহ, ইমিট করা, এবং আউটপুট ফিল্ড ঘোষণা করা সম্ভব।

Content added By
Promotion

Are you sure to start over?

Loading...