Spouts হলো Apache Storm টপোলজির একটি গুরুত্বপূর্ণ অংশ, যেগুলি ডেটা সংগ্রহ করে এবং Storm সিস্টেমে প্রবাহিত করে। Spouts সাধারণত ডেটার উৎস থেকে ডেটা নিয়ে আসে, যেমন মেসেজ কিউ, ডেটাবেস, বা ফাইল সিস্টেম। কখনও কখনও, আপনি একটি Custom Spout তৈরি করতে পারেন যেটি আপনার নির্দিষ্ট ডেটা উৎস বা প্রক্রিয়াকরণের জন্য উপযুক্ত।
এই গাইডে আমরা শিখব কিভাবে Custom Spout তৈরি করতে হয় এবং কীভাবে Storm টপোলজিতে সেটি ব্যবহার করা যায়।
Custom Spout কী?
Custom Spout হল একটি কাস্টম ক্লাস যা Storm-এ ডেটা প্রবাহিত করার জন্য তৈরি করা হয়। এটি একটি ডেটা উৎস (যেমন Kafka, RabbitMQ, ফাইল সিস্টেম ইত্যাদি) থেকে ডেটা সংগ্রহ করে এবং emit() মেথড ব্যবহার করে Storm টপোলজিতে পাঠায়। Storm এ Spout সাধারণত IRichSpout ইন্টারফেস অথবা BaseRichSpout ক্লাসের মাধ্যমে তৈরি করা হয়।
Spout তৈরি করতে কি কি প্রয়োজন?
- Spout ক্লাস তৈরি করুন: আপনার কাস্টম স্পাউট ক্লাস তৈরি করুন যা Storm এর
IRichSpoutইন্টারফেস ইমপ্লিমেন্ট করবে অথবাBaseRichSpoutক্লাস এক্সটেন্ড করবে। - ডেটা উৎস থেকে ডেটা সংগ্রহ: Spout টাস্কের কাজ হলো ডেটা সংগ্রহ করা এবং সেটি Storm টপোলজিতে পাঠানো।
- emit() মেথড ব্যবহার করুন: Spout এর ডেটা প্রসেসিং বা স্ট্রিমের মাধ্যমে ইমিট করতে emit() মেথড ব্যবহার করুন।
Custom Spout তৈরি করার ধাপ
এখানে আমরা একটি Custom Spout তৈরি করার প্রক্রিয়া এবং কোড উদাহরণ দেখব।
১. BaseRichSpout ক্লাস তৈরি করা
Storm এর BaseRichSpout ক্লাসটিকে এক্সটেন্ড করে Custom Spout তৈরি করা হয়। BaseRichSpout ব্যবহার করলে আপনি বেশ কিছু ডিফল্ট কার্যক্রম সহজে পরিচালনা করতে পারেন, যেমন open(), nextTuple(), এবং close() মেথডগুলি।
২. Custom Spout কোড উদাহরণ
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.Map;
public class MyCustomSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private int counter;
@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
this.counter = 0; // ডেটা উৎপন্ন করতে কাউন্টার ব্যবহার
}
@Override
public void nextTuple() {
// প্রতি সময়ে একটি নতুন টুপল ইমিট করা
if (counter < 5) { // 5টি আইটেম উৎপন্ন করার জন্য
collector.emit(new Values("Message " + counter)); // একটি টুপল তৈরি করে ইমিট করা
counter++;
} else {
try {
Thread.sleep(1000); // যদি ডেটা না থাকে, তবে এক সেকেন্ড অপেক্ষা
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void close() {
// Spout বন্ধ করার সময় প্রয়োজনীয় ক্লিন-আপ কোড লিখতে পারেন।
}
@Override
public void declareOutputFields(Fields declarer) {
// Spout থেকে যে ফিল্ডগুলি আউটপুট হিসেবে প্রদান হবে, তা ঘোষণা করা
declarer.declare(new Fields("message"));
}
}
এখানে আমরা একটি সাধারণ MyCustomSpout তৈরি করেছি যা প্রতি বার একটি নতুন বার্তা (যেমন "Message 0", "Message 1", ইত্যাদি) উৎপন্ন করবে এবং সেটি Storm টপোলজিতে পাঠাবে।
- open(): Spout এর ইনিশিয়ালাইজেশন কাজ সম্পন্ন করে এবং এটি SpoutOutputCollector প্রাপ্ত করে, যা টুপলগুলিকে emit() করে পাঠাতে সাহায্য করে।
- nextTuple(): এখানে ডেটা উৎপন্ন হয় এবং emit() মেথডের মাধ্যমে Storm টপোলজিতে পাঠানো হয়।
- close(): Spout বন্ধ হওয়ার সময় কিছু ক্লিন-আপ কাজ করা যায়।
- declareOutputFields(): Spout থেকে আউটপুট হিসেবে যেসব ফিল্ড পাঠানো হবে তা ডিফাইন করা হয়।
৩. Custom Spout টপোলজিতে যুক্ত করা
Custom Spout তৈরি করার পর, আপনাকে এটি Storm টপোলজিতে যুক্ত করতে হবে। নিচে একটি টপোলজি কোড উদাহরণ দেওয়া হলো যেখানে আমরা আমাদের কাস্টম স্পাউট ব্যবহার করছি।
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.LocalCluster;
public class MyStormTopology {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
// কাস্টম স্পাউট টপোলজিতে যুক্ত করা
builder.setSpout("myCustomSpout", new MyCustomSpout(), 1);
// একটি বোল্ট যুক্ত করা
builder.setBolt("printBolt", new PrintBolt(), 1).shuffleGrouping("myCustomSpout");
// টপোলজি তৈরি
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("myTopolgy", new Config(), builder.createTopology());
// টপোলজি কিছুক্ষণ চলার পর ক্লাস্টার বন্ধ করা
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cluster.shutdown();
}
}
এখানে আমরা MyCustomSpout কে setSpout() মেথডের মাধ্যমে টপোলজিতে যুক্ত করেছি এবং একটি PrintBolt বোল্ট ব্যবহার করেছি, যা Spout থেকে প্রাপ্ত টুপল প্রিন্ট করবে।
৪. Custom Spout এর বোল্ট তৈরি করা
একটি সাধারণ PrintBolt বোল্ট যা Spout থেকে প্রাপ্ত টুপল প্রিন্ট করবে:
import org.apache.storm.task.OutputCollector;
import org.apache.storm.topology.BasicBolt;
import org.apache.storm.tuple.Tuple;
public class PrintBolt extends BasicBolt {
@Override
public void execute(Tuple input) {
// Spout থেকে আসা মেসেজ প্রিন্ট করা
String message = input.getStringByField("message");
System.out.println("Received: " + message);
}
@Override
public void declareOutputFields(Fields declarer) {
// এই বোল্ট কোন আউটপুট দিবে না
}
}
এটি কাস্টম স্পাউটের আউটপুট টুপলগুলো প্রিন্ট করবে।
সারাংশ
Custom Spout তৈরি করার মাধ্যমে আপনি Storm টপোলজিতে আপনার প্রয়োজনীয় ডেটা উৎস বা লজিক ইনজেক্ট করতে পারেন। BaseRichSpout বা IRichSpout ইন্টারফেস ব্যবহার করে Spout তৈরি করা হয়, যা Storm টপোলজির মধ্যে ডেটা প্রবাহিত করার জন্য কার্যকরী। Spout এর মাধ্যমে ডেটা সংগ্রহ, ইমিট করা, এবং আউটপুট ফিল্ড ঘোষণা করা সম্ভব।
Read more