Storm এবং Hadoop এর জন্য Data Processing Pipelines

Storm এবং Apache Hadoop Integration - অ্যাপাচি স্টর্ম (Apache Storm) - Big Data and Analytics

444

Apache Storm এবং Apache Hadoop একত্রে কাজ করতে পারে একটি শক্তিশালী Data Processing Pipeline তৈরি করার জন্য। যেখানে Storm রিয়েল-টাইম ডেটা স্ট্রিমিং এবং প্রক্রিয়াকরণের জন্য ব্যবহৃত হয়, সেখানে Hadoop বৃহৎ পরিমাণ ডেটা ব্যাচ প্রসেসিং এবং বিশ্লেষণের জন্য ব্যবহৃত হয়। এই দুইটি সিস্টেম একত্রে রিয়েল-টাইম ডেটা সংগ্রহ এবং প্রক্রিয়া করার পাশাপাশি, সেই ডেটাকে বিশ্লেষণ এবং স্টোর করার জন্য কার্যকরী ডেটা প্রসেসিং পাইপলাইন তৈরি করতে সাহায্য করে।

এই টিউটোরিয়ালে, আমরা Storm এবং Hadoop এর মধ্যে ডেটা প্রসেসিং পাইপলাইন তৈরি করার প্রক্রিয়া এবং তাদের ইন্টিগ্রেশন নিয়ে আলোচনা করব।


Storm এবং Hadoop Data Processing Pipeline এর ভূমিকা

Data Processing Pipeline একটি স্ট্রিম (ধারা) যেখানে ডেটা সংগ্রহ, প্রক্রিয়া, এবং সংরক্ষণের জন্য একাধিক ধাপের মাধ্যমে ডেটা প্রবাহিত হয়। Storm এবং Hadoop একটি পূর্ণাঙ্গ ডেটা প্রসেসিং পাইপলাইন গড়ে তোলে, যেখানে Storm রিয়েল-টাইম ডেটা প্রসেস করে এবং Hadoop সেই ডেটাকে বিশ্লেষণ ও সংরক্ষণ করে।

  • Storm: লাইভ ডেটা স্ট্রিম সংগ্রহ এবং দ্রুত প্রক্রিয়াকরণের জন্য ব্যবহৃত হয়।
  • Hadoop: বৃহৎ পরিমাণে ডেটা ব্যাচ প্রসেসিং এবং বিশ্লেষণ করার জন্য ব্যবহৃত হয়, যেখানে ডেটা ব্যাচ আকারে প্রক্রিয়া করা হয় এবং HDFS (Hadoop Distributed File System) বা অন্য ডেটাবেসে সংরক্ষণ করা হয়।

Storm এবং Hadoop এর মধ্যে ডেটা প্রসেসিং পাইপলাইন তৈরি করা

Storm এবং Hadoop একত্রে কাজ করতে, প্রথমে Storm Spout ব্যবহার করে Kafka, HDFS, বা অন্য কোন ডেটা সোর্স থেকে ডেটা সংগ্রহ করতে হবে। তারপর Storm Bolt সেই ডেটার ওপর কাজ (যেমন ফিল্টারিং, ট্রান্সফরমেশন, অ্যাগ্রিগেশন) করবে এবং পরিশেষে সেই ডেটা HDFS বা Hadoop ক্লাস্টারে সংরক্ষণ করা হবে।

স্টেপ 1: Storm স্পাউট দিয়ে ডেটা সংগ্রহ

Storm-এ Spout ব্যবহার করে লাইভ ডেটা সংগ্রহ করা হয়। Spout সাধারণত ডেটা উৎস (যেমন Kafka, HDFS, API) থেকে ডেটা সংগ্রহ করে এবং Storm টপোলজির মধ্যে পাঠায়।

KafkaSpoutConfig<String, String> spoutConfig = KafkaSpoutConfig.builder("localhost:9092", "data-topic")
    .setGroupId("storm-group")
    .build();
KafkaSpout<String, String> spout = new KafkaSpout<>(spoutConfig);

এখানে KafkaSpout Kafka থেকে ডেটা সংগ্রহ করে Storm-এ পাঠাচ্ছে। আপনি যদি HDFS বা অন্য ডেটা সোর্স ব্যবহার করতে চান, তবে সংশ্লিষ্ট স্পাউট কনফিগারেশন ব্যবহার করতে হবে।


স্টেপ 2: Storm বোল্ট দিয়ে ডেটা প্রক্রিয়া

Storm-এর Bolt ব্যবহার করে ডেটা প্রক্রিয়া করা হয়। Bolt সাধারণত ডেটার উপর ট্রান্সফরমেশন, অ্যাগ্রিগেশন বা ফিল্টারিং প্রক্রিয়া চালায়। আপনি এখানে ডেটার একাধিক ধাপের প্রক্রিয়া করতে পারেন, যেমন:

  • ডেটা ফিল্টারিং: নির্দিষ্ট শর্ত পূরণকারী ডেটা নির্বাচন করা।
  • ডেটা অ্যাগ্রিগেশন: একটি নির্দিষ্ট সময়কালে ডেটা একত্রিত করা।
  • ডেটা ট্রান্সফরমেশন: ডেটা রূপান্তর করা (যেমন, টেক্সট থেকে সংখ্যা, JSON থেকে XML ইত্যাদি)।
public class DataProcessingBolt extends BaseBasicBolt {
    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String rawData = tuple.getStringByField("message");
        // Process the data here (e.g., transformation or aggregation)
        String processedData = rawData.toUpperCase();
        collector.emit(new Values(processedData));  // Emit processed data
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("processedData"));
    }
}

এখানে, DataProcessingBolt ডেটাকে রূপান্তর (যেমন, টেক্সটকে বড় হাতের অক্ষরে) করছে। আপনি এখানে আরও জটিল প্রক্রিয়া যেমন ডেটা বিশ্লেষণ বা অ্যাগ্রিগেশন যোগ করতে পারেন।


স্টেপ 3: Storm থেকে Hadoop HDFS-এ ডেটা লেখা

প্রক্রিয়া করা ডেটা HDFS বা Hadoop ডেটাবেসে লেখার জন্য, Storm-এ একটি Bolt ব্যবহার করতে হবে যা ডেটা লেখার কাজ করে। এখানে, Storm এর HDFS Bolt ব্যবহার করে ডেটা HDFS-এ সংরক্ষণ করা যাবে।

public class HdfsBolt extends BaseBasicBolt {
    private FileSystem fs;
    private Path outputPath;
    private BufferedWriter writer;

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        Configuration hdfsConf = new Configuration();
        try {
            fs = FileSystem.get(URI.create("hdfs://namenode_host:9000"), hdfsConf);
            outputPath = new Path("/user/hdfs/output/outputfile.txt");
            writer = new BufferedWriter(new OutputStreamWriter(fs.create(outputPath, true)));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void execute(Tuple tuple) {
        String processedData = tuple.getStringByField("processedData");
        try {
            writer.write(processedData);
            writer.newLine();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void cleanup() {
        try {
            writer.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

এই HdfsBolt ডেটা প্রক্রিয়া করার পর HDFS-এ লেখে।


স্টেপ 4: Storm টপোলজি তৈরি এবং চালানো

Storm টপোলজি তৈরি করার পর, আপনি Storm LocalCluster বা Nimbus এর মাধ্যমে এটি চালাতে পারেন।

public class StormHadoopTopology {
    public static void main(String[] args) throws Exception {
        Config conf = new Config();
        conf.setDebug(true);

        // Create a topology builder
        TopologyBuilder builder = new TopologyBuilder();

        // Set the Spout
        builder.setSpout("kafka-spout", new KafkaSpout<>(spoutConfig), 1);

        // Set the Data Processing Bolt
        builder.setBolt("process-bolt", new DataProcessingBolt(), 2).shuffleGrouping("kafka-spout");

        // Set the HDFS Bolt
        builder.setBolt("hdfs-bolt", new HdfsBolt(), 1).shuffleGrouping("process-bolt");

        // Submit the topology
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("storm-hadoop-topology", conf, builder.createTopology());
        
        // Run for a while then stop
        Thread.sleep(10000);
        cluster.shutdown();
    }
}

এই টপোলজি Storm থেকে Kafka থেকে ডেটা সংগ্রহ করে এবং HDFS-এ পাঠানোর মাধ্যমে একটি পূর্ণাঙ্গ ডেটা প্রসেসিং পাইপলাইন তৈরি করা হয়েছে।


Storm এবং Hadoop Data Processing Pipelines এর ব্যবহার

  1. রিয়েল-টাইম ডেটা স্ট্রিমিং: Storm লাইভ ডেটা স্ট্রিম সংগ্রহ করে, প্রক্রিয়া করে এবং সেই ডেটা Hadoop-এ বিশ্লেষণ ও সংরক্ষণ করার জন্য পাঠায়।
  2. বৃহৎ পরিমাণে ডেটা বিশ্লেষণ: Hadoop ব্যাচ প্রসেসিং করার মাধ্যমে Storm-এ প্রক্রিয়া করা ডেটার বিশ্লেষণ করতে পারে।
  3. ডেটার ইতিহাস: Storm এবং Hadoop এর মাধ্যমে, আপনি লাইভ ডেটার পাশাপাশি অতীতের বিশ্লেষণ করতে পারবেন।

সারাংশ

Apache Storm এবং Apache Hadoop একত্রে একটি শক্তিশালী Data Processing Pipeline তৈরি করে, যেখানে Storm রিয়েল-টাইম ডেটা সংগ্রহ এবং প্রক্রিয়া করে, এবং Hadoop ডেটাকে বিশ্লেষণ ও সংরক্ষণ করে। Storm এবং Hadoop একত্রে কাজ করতে, আপনি Kafka থেকে ডেটা সংগ্রহ করতে পারেন এবং Storm টপোলজির মাধ্যমে HDFS-এ সেই ডেটা লিখতে পারেন। এই ইন্টিগ্রেশন ডেটা প্রসেসিংকে আরও স্কেলেবল, কার্যকরী এবং ফল্ট টলারেন্ট করে তোলে।

Content added By
Promotion

Are you sure to start over?

Loading...