Apache Storm একটি শক্তিশালী রিয়েল-টাইম ডিসট্রিবিউটেড ডেটা প্রসেসিং সিস্টেম, যা spouts এবং bolts ব্যবহার করে ডেটা প্রক্রিয়া করে। Spout ডেটা সংগ্রহ করে এবং Bolt সেই ডেটার উপর প্রক্রিয়া বা ট্রান্সফরমেশন সম্পন্ন করে। যদিও Storm-এর ডিফল্ট Bolts অনেক সাধারণ ডেটা প্রক্রিয়াকরণের জন্য উপযুক্ত, তবে কখনও কখনও আপনাকে আপনার নিজস্ব Custom Bolt তৈরি করতে হতে পারে, যেটি আপনার নির্দিষ্ট ডেটা প্রসেসিং চাহিদা পূরণ করবে।
এই টিউটোরিয়ালে, আমরা Custom Bolts তৈরি করার পদ্ধতি এবং তার প্রক্রিয়া নিয়ে আলোচনা করব।
১. Bolt কি এবং কেন Custom Bolt প্রয়োজন?
Bolt হল Storm টপোলজির একটি মূল উপাদান, যা প্রাপ্ত ডেটার উপর বিভিন্ন প্রক্রিয়া বা ট্রান্সফরমেশন সম্পাদন করে। Custom Bolt তৈরি করার প্রয়োজন তখন হয় যখন আপনাকে নির্দিষ্ট ধরনের ডেটা প্রক্রিয়া করতে হয় যা Storm-এর ডিফল্ট বোল্টগুলির মাধ্যমে করা সম্ভব নয়।
Custom Bolt সাধারণত:
- ডেটার ফিল্টারিং বা ট্রান্সফরমেশন
- ডেটাবেস বা API-তে ডেটা ইনসার্ট বা আপডেট
- একটি বিশেষ হিসাব বা অ্যালগরিদম প্রয়োগ
- ডেটার উপরে কাস্টম লজিক প্রয়োগ
২. Storm-এর Custom Bolt তৈরি করার প্রাথমিক পদক্ষেপ
Storm-এ Custom Bolt তৈরি করতে আপনাকে Storm-এ BasicBolt বা IRichBolt ক্লাসের মাধ্যমে একটি কাস্টম বোল্ট তৈরি করতে হবে। Storm দুটি ক্লাস প্রদান করে:
- BaseBasicBolt: একটি সহজ বোল্ট তৈরি করার জন্য যা
execute()মেথডের মাধ্যমে প্রক্রিয়া করে। - IRichBolt: এক্সটেনসিবল বোল্ট যা
prepare(),execute(), এবংcleanup()মেথড সাপোর্ট করে, এবং অতিরিক্ত ফিচার যেমন মেমরি স্টেট এবং কনফিগারেশন ম্যানেজমেন্ট সহ আসে।
২.১ Custom Bolt তৈরি করার উদাহরণ
এখানে একটি কাস্টম বোল্ট তৈরি করার উদাহরণ দেওয়া হলো, যা ইনপুট ডেটার উপর একটি অগমেন্টেশন প্রক্রিয়া সম্পাদন করবে এবং প্রক্রিয়া করা ডেটাকে আউটপুট হিসাবে পাঠাবে।
import org.apache.storm.task.OutputCollector;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.bolt.BaseBasicBolt;
public class MyCustomBolt extends BaseBasicBolt {
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
// Get data from the tuple
String value = input.getStringByField("message");
// Process data - example of adding a prefix to the message
String processedValue = "Processed: " + value;
// Emit the processed data
collector.emit(new Values(processedValue));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// Declare the output fields - in this case, the processed message
declarer.declare(new Fields("processed_message"));
}
}
এখানে:
execute()মেথডে ইনপুট Tuple থেকে ডেটা বের করে প্রক্রিয়া করা হয়েছে।collector.emit()ব্যবহার করে প্রক্রিয়া করা ডেটা আউটপুট হিসাবে পাঠানো হয়েছে।declareOutputFields()মেথডে আউটপুট ফিল্ডের নাম ঘোষণা করা হয়েছে।
৩. Custom Bolt এর জন্য State Management
Storm-এ আপনি যখন Stateful Bolt তৈরি করবেন, তখন আপনাকে state পরিচালনার জন্য কিছু অতিরিক্ত লজিক যোগ করতে হবে। Storm এর IRichBolt ইন্টারফেস prepare() মেথড, যেখানে আপনি state তৈরি এবং কনফিগারেশন পরিচালনা করতে পারেন।
৩.১ Stateful Bolt উদাহরণ
import org.apache.storm.task.OutputCollector;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.state.State;
import org.apache.storm.state.StateFactory;
import org.apache.storm.state.StormState;
import org.apache.storm.state.State;
public class StatefulBolt implements IRichBolt {
private OutputCollector collector;
private State state;
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
// Set up state for tracking or storing intermediate values
this.state = new StormState(); // example of a state object
}
@Override
public void execute(Tuple input) {
// Access and modify state as needed
String value = input.getStringByField("message");
String processedValue = "Processed: " + value;
// Update state (example)
state.put("last_processed", processedValue);
collector.emit(new Values(processedValue));
}
@Override
public void cleanup() {
// Cleanup state if necessary
if (state != null) {
state.close();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("processed_message"));
}
}
এখানে, State ব্যবহারের মাধ্যমে স্পষ্টভাবে stateful তথ্য পরিচালনা করা হচ্ছে, যা ডেটার প্রক্রিয়াকরণের সময় ব্যবহৃত হয় এবং পরবর্তী প্রসেসে ব্যবহারের জন্য state আপডেট করা হয়।
৪. Custom Bolt এ Error Handling
Storm-এ Error Handling গুরুত্বপূর্ণ, কারণ কিছু ব্যতিক্রমী অবস্থায় প্রক্রিয়া ব্যর্থ হতে পারে। try-catch ব্লক ব্যবহার করে আপনি Exception সঠিকভাবে ধরতে পারেন এবং ব্যর্থ টাস্ক পুনরায় চেষ্টা করতে পারেন।
public class MyErrorHandlingBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
try {
String value = tuple.getStringByField("message");
// Process the value
String processedValue = "Processed: " + value;
collector.emit(new Values(processedValue));
} catch (Exception e) {
// Log the exception and fail the tuple for retry
System.err.println("Error processing tuple: " + e.getMessage());
collector.fail(tuple);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("processed_message"));
}
}
এখানে, কোনো ত্রুটি ঘটলে fail() ব্যবহার করে সেই Tuple পুনরায় প্রক্রিয়া করার জন্য নির্দেশনা দেয়া হচ্ছে।
৫. Custom Bolt Optimization Techniques
Storm টপোলজির পারফরম্যান্স উন্নত করতে কিছু optimization techniques রয়েছে:
- Parallel Processing: Custom Bolt-এ প্রক্রিয়াকরণের জন্য বেশি থ্রেড বা টাস্ক ব্যবহার করে, আপনি পারফরম্যান্স উন্নত করতে পারেন।
- Batching: একে একে ডেটা প্রক্রিয়া না করে ব্যাচে ডেটা প্রক্রিয়া করুন, যাতে স্ট্রিমিং ডেটা দ্রুত প্রক্রিয়া করা যায়।
- Stateful Processing: যখন অনেক ডেটা প্রসেস করার প্রয়োজন হয়, তখন stateful bolts ব্যবহার করুন, যা পূর্ববর্তী স্টেটের ভিত্তিতে পরবর্তী প্রসেসিং করে।
সারাংশ
Storm-এ Custom Bolts তৈরি করে আপনি আপনার নির্দিষ্ট ডেটা প্রসেসিং চাহিদা অনুযায়ী কাস্টম ট্রান্সফরমেশন, ফিল্টারিং, অ্যাগ্রিগেশন বা অন্য যেকোনো প্রক্রিয়া কার্যকরীভাবে করতে পারেন। Storm-এ stateful এবং stateless বোল্টের মাধ্যমে ডেটা প্রক্রিয়া এবং স্টেট ম্যানেজমেন্ট করা সম্ভব। Error handling এবং performance optimization techniques ব্যবহার করে আপনি আপনার Custom Bolt আরও কার্যকরী এবং পারফরম্যান্স-বান্ধব করতে পারেন।
Read more