Apache Storm একটি রিয়েল-টাইম ডিসট্রিবিউটেড ডেটা প্রসেসিং সিস্টেম, যা দ্রুত ডেটা স্ট্রিম প্রক্রিয়া করতে ব্যবহৃত হয়। Storm টপোলজিতে Spouts এবং Bolts প্রধান উপাদান হিসেবে কাজ করে। Spouts হলো ডেটা সোর্স থেকে ডেটা সংগ্রহ করার জন্য ব্যবহৃত একক, যা Storm টপোলজির মধ্যে ডেটা প্রেরণ করে। কখনো কখনো আমাদের নির্দিষ্ট ডেটা সোর্সের জন্য Custom Spout তৈরি করার প্রয়োজন পড়ে। এই প্রক্রিয়ায়, আমরা আমাদের প্রয়োজন অনুযায়ী Spout ডিজাইন ও কাস্টমাইজ করি এবং Storm টপোলজিতে Deploy করি।
এই টিউটোরিয়ালে, আমরা শিখব কিভাবে Custom Spout তৈরি করা যায় এবং Storm ক্লাস্টারে এটি Deploy করা যায়।
১. Custom Spouts তৈরি (Create Custom Spouts)
Storm-এ Custom Spout তৈরি করতে, আপনাকে Storm API ব্যবহার করতে হবে এবং IRichSpout ইন্টারফেসের মাধ্যমে একটি নতুন স্পাউট ক্লাস তৈরি করতে হবে। এই ক্লাসটি nextTuple() মেথড ব্যবহার করে ডেটা সংগ্রহ করবে এবং open(), close(), এবং declareOutputFields() মেথডগুলোও রি-ডিফাইন করতে হবে।
১.১ Custom Spout তৈরি করার পদক্ষেপ:
- IRichSpout Interface ইমপ্লিমেন্ট করুন।
- open() মেথডে ডেটা সোর্সের সাথে সংযোগ স্থাপন করুন।
- nextTuple() মেথডে ডেটা সংগ্রহ করুন এবং emit() করুন।
- declareOutputFields() মেথডে টুপলের ফিল্ড ঘোষণা করুন।
১.২ Custom Spout উদাহরণ:
এখানে একটি Custom Spout উদাহরণ দেওয়া হলো যা একটি কাস্টম ডেটা সোর্স (যেমন একটি ডাটাবেস বা একটি ফাইল) থেকে ডেটা সংগ্রহ করবে এবং Storm টপোলজির মধ্যে পাঠাবে।
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.task.TopologyContext;
import java.util.Map;
public class MyCustomSpout implements IRichSpout {
private SpoutOutputCollector collector;
private boolean isEmitting = true;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
// Here you can open a database connection, file reader, or network connection
}
@Override
public void nextTuple() {
if (isEmitting) {
// Simulate data collection
String data = "Sample Data"; // This would be data from your source, such as a database, file, etc.
collector.emit(new Values(data)); // Emit the collected data
isEmitting = false; // After emitting, stop emitting for a while
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("data")); // Declare the output fields for the spout
}
@Override
public void close() {
// Close any resources if needed
}
@Override
public void activate() {
// Logic when spout is activated
}
@Override
public void deactivate() {
// Logic when spout is deactivated
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null; // Configuration details for the spout
}
}
এখানে:
open()মেথডে ডেটা সোর্সের সাথে সংযোগ স্থাপন করা হয়।nextTuple()মেথডে ডেটা সংগ্রহ করা হয় এবং Storm ক্লাস্টারের মধ্যে পাঠানো হয়।declareOutputFields()মেথডে স্পাউটের আউটপুট ফিল্ডগুলি ঘোষণা করা হয়।
২. Custom Spout Deploy করা
একবার আপনি আপনার Custom Spout তৈরি করার পর, আপনাকে এটি Storm টপোলজিতে ডিপ্লয় করতে হবে। Storm-এ একটি স্পাউট ডিপ্লয় করতে, প্রথমে এটি একটি টপোলজিতে যুক্ত করতে হবে এবং তারপর সেই টপোলজিকে Storm ক্লাস্টারে চালু করতে হবে।
২.১ Custom Spout টপোলজিতে যুক্ত করা:
- TopologyBuilder ব্যবহার করে আপনার Custom Spout কে Storm টপোলজিতে যুক্ত করুন।
- Spout কে
setSpout()মেথড ব্যবহার করে টপোলজিতে যুক্ত করুন।
২.২ Custom Spout Example Deployment:
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
public class MyTopology {
public static void main(String[] args) throws Exception {
// Create a TopologyBuilder instance
TopologyBuilder builder = new TopologyBuilder();
// Create an instance of the custom spout
builder.setSpout("my-custom-spout", new MyCustomSpout(), 1); // "my-custom-spout" is the spout id, and 1 is the parallelism
// Create a local cluster for testing (this can be switched to a distributed cluster in production)
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
// Submit the topology to the local cluster
cluster.submitTopology("my-topology", conf, builder.createTopology());
}
}
এখানে:
setSpout()মেথডে আমাদের Custom Spout যুক্ত করা হয়েছে। স্পাউটের নাম"my-custom-spout"এবং এর parallelism সংখ্যা1দেওয়া হয়েছে।- তারপর টপোলজিটি LocalCluster এ submit করা হয়েছে। এটি স্টর্মের মধ্যে চলমান একটি স্থানীয় ক্লাস্টার।
২.৩ Custom Spout টপোলজি রান করা:
- আপনার কোড কমপাইল এবং রান করতে Storm-এর
storm jarকমান্ড ব্যবহার করতে পারেন। - Storm-এ আপনার টপোলজি ডিপ্লয় করার জন্য Nimbus সার্ভারে এটি জমা দিতে হবে।
storm jar my-topology.jar com.example.MyTopology
৩. Custom Spout-এর কার্যকারিতা এবং মনিটরিং
Storm-এ Custom Spout কার্যকরী করতে, আপনাকে এটি পরীক্ষা এবং মনিটরিং করা প্রয়োজন। Storm-এর UI ব্যবহার করে আপনি স্পাউটের কার্যকারিতা এবং এর বিভিন্ন পরামিতি যেমন tuple emissions, latency, acknowledgements, ইত্যাদি মনিটর করতে পারবেন।
৩.১ Spout Latency এবং Throughput
Storm UI তে আপনি স্পাউটের latency এবং throughput ট্র্যাক করতে পারবেন। এতে আপনি দেখতে পারবেন স্পাউটের কার্যকারিতা কতটা দ্রুত এবং সঠিকভাবে ডেটা প্রক্রিয়া হচ্ছে।
৩.২ Spout Failure Monitoring
Storm UI এবং Logs মনিটর করে আপনি আপনার স্পাউটের ব্যর্থতা শনাক্ত করতে পারবেন। যদি আপনার স্পাউট কোনো কারণে ব্যর্থ হয়, আপনি তা retry মেকানিজম বা backpressure handling দ্বারা ম্যানেজ করতে পারেন।
সারাংশ
Storm-এ Custom Spout তৈরি করা একটি গুরুত্বপূর্ণ কাজ, যা Storm-এর রিয়েল-টাইম ডেটা প্রক্রিয়াকরণ এবং স্ট্রিমিং ডেটা সংগ্রহের ক্ষমতা বৃদ্ধি করে। Spout তৈরি করার সময়, ডেটা সোর্স থেকে ডেটা সংগ্রহ করে Storm টপোলজিতে প্রেরণ করার জন্য IRichSpout ইন্টারফেস ইমপ্লিমেন্ট করতে হয়। এরপর, আপনি টপোলজি তৈরি করে এটি Storm ক্লাস্টারে ডিপ্লয় করতে পারেন। Storm UI ব্যবহার করে আপনি স্পাউটের কার্যকারিতা এবং স্পিড ট্র্যাক করতে পারবেন এবং প্রয়োজনে এটি অপটিমাইজ করতে পারবেন। Custom Spout Storm-এ ডেটা সংগ্রহ ও প্রক্রিয়াকরণ কাজে ব্যবহার হতে পারে বিভিন্ন বাস্তব পরিস্থিতিতে, যেমন ডেটাবেস থেকে ডেটা সংগ্রহ, ওয়েব সার্ভিস থেকে ডেটা প্রাপ্তি ইত্যাদি।
Read more