Spring Cloud Stream (Message-Driven Microservices)

Java Technologies - স্প্রিং ক্লাউড (Spring Cloud)
89
89

Spring Cloud Stream হল একটি হাই-এবস্ট্রাক্ট, ইভেন্ট-ড্রিভেন এবং মেসেজ-বেসড মাইক্রোসার্ভিস আর্কিটেকচার তৈরি করার জন্য ব্যবহৃত একটি ফ্রেমওয়ার্ক। এটি মাইক্রোসার্ভিসের মধ্যে এসিনক্রোনাস মেসেজ কমিউনিকেশন সমর্থন করে এবং বিভিন্ন মেসেজ ব্রোকার (যেমন RabbitMQ, Kafka) এর সাথে ইন্টিগ্রেটেড হয়। Spring Cloud Stream সার্ভিসগুলোকে আলাদা ও স্বতন্ত্রভাবে কাজ করতে সহায়ক করে এবং যোগাযোগের জন্য মেসেজ কিউ ব্যবহার করে।

Spring Cloud Stream এর উদ্দেশ্য:

  • Message-Driven Architecture: মাইক্রোসার্ভিস গুলোর মধ্যে অখণ্ডভাবে মেসেজ আদান-প্রদান।
  • Decoupling: মাইক্রোসার্ভিসগুলোকে একে অপরের থেকে ডিকাপল করা।
  • Scalability: মেসেজ কিউ ব্যাবহার করে উচ্চ স্কেলেবিলিটি অর্জন করা।
  • Resilience: একাধিক সার্ভিসের মধ্যে মেসেজ সিস্টেম সহজেই স্থিতিশীল এবং রেসিলিয়েন্ট থাকতে পারে।

Spring Cloud Stream Features:

  1. Binding: ইভেন্ট বা মেসেজ প্রেরণ এবং গ্রহণের জন্য আউটপুট এবং ইনপুট চ্যানেলগুলির মধ্যে "binding" সম্পর্ক তৈরি।
  2. Event-Driven: মাইক্রোসার্ভিসের মধ্যে ইভেন্ট বা মেসেজ মাধ্যমে যোগাযোগের উপায়।
  3. Multiple Binder Support: Kafka, RabbitMQ সহ বিভিন্ন মেসেজ ব্রোকারের সাথে কাজ করা।
  4. Dynamic Scaling: স্কেলিং সহজতর করতে মেসেজ ব্যাকগ্রাউন্ড পুশ করা।
  5. Transactional Messaging: মেসেজ প্রক্রিয়া করার সময় ট্রানজ্যাকশন সমর্থন।

Spring Cloud Stream Setup এবং কনফিগারেশন

Step 1: Maven Dependencies

প্রথমে Spring Cloud Stream এবং Kafka/RabbitMQ ডিপেন্ডেন্সি আপনার pom.xml ফাইলে যোগ করুন।

Example (Kafka):

<dependencies>
    <!-- Spring Boot Starter Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- Spring Cloud Stream with Kafka Binder -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>

    <!-- Spring Cloud Stream Binder Dependency for Kafka -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>
</dependencies>

Example (RabbitMQ):

<dependencies>
    <!-- Spring Boot Starter Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- Spring Cloud Stream with RabbitMQ Binder -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>

    <!-- Spring Cloud Stream Binder Dependency for RabbitMQ -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
    </dependency>
</dependencies>

Step 2: Application Configuration

application.yml বা application.properties ফাইলে Spring Cloud Stream কনফিগারেশন যোগ করুন। এখানে Kafka অথবা RabbitMQ ব্যবহার করা যেতে পারে। নিচে Kafka এর কনফিগারেশন দেখানো হয়েছে।

application.yml (Kafka Configuration):

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: my-topic
        input:
          destination: my-topic
      kafka:
        binder:
          brokers: localhost:9092  # Kafka broker address
          consumer:
            group: my-group

এখানে:

  • output: মেসেজ আউটপুট সেলুলার চ্যানেল।
  • input: মেসেজ ইনপুট সেলুলার চ্যানেল।
  • my-topic: Kafka টপিকের নাম।
  • brokers: Kafka ব্রোকারের ঠিকানা।

Step 3: Message Producer (Message Sender)

Spring Cloud Stream ব্যবহার করে আপনি মেসেজ প্রেরণ করতে পারেন। এই জন্য আপনি MessageChannel ব্যবহার করতে পারেন। নিচে একটি message producer উদাহরণ দেখানো হলো।

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
@EnableBinding(Source.class) // Binding to the output channel
public class MessageProducer {

    private final Source source;

    public MessageProducer(Source source) {
        this.source = source;
    }

    public void sendMessage(String message) {
        source.output().send(MessageBuilder.withPayload(message).build()); // Sending message to the output channel
    }
}

এখানে, Source ক্লাস ব্যবহার করা হয়েছে যা output channel কে প্রোস্ট করা হয়েছে। মেসেজ প্রেরণের জন্য MessageBuilder ব্যবহার করা হয়েছে।

Step 4: Message Consumer (Message Receiver)

Message Consumer দ্বারা মেসেজ গ্রহণ করা হয়। এখানে MessageChannel ব্যবহার করে ইনপুট চ্যানেল থেকে মেসেজ গ্রহণ করা হচ্ছে।

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;

@Service
@EnableBinding(Source.class) // Binding to input channel
public class MessageConsumer {

    @StreamListener(target = Source.INPUT) // Listening to input channel
    public void handleMessage(String message) {
        System.out.println("Received message: " + message); // Process the received message
    }
}

এখানে, @StreamListener অ্যানোটেশন ব্যবহার করা হয়েছে, যা input channel থেকে মেসেজ গ্রহণ করে এবং প্রসেস করে।

Step 5: Run the Application

  1. Kafka/RabbitMQ ব্রোকার চালু করুন (যদি এটি না থাকে, তবে আপনি একটি লোকাল বা ক্লাউড-ভিত্তিক Kafka/RabbitMQ ব্রোকার ব্যবহার করতে পারেন)।
  2. Spring Boot Application চালু করুন।

Spring Cloud Stream Advanced Features

  1. Message-Driven Event Processing: Spring Cloud Stream মেসেজ প্রক্রিয়াকরণের জন্য ইভেন্ট-ড্রিভেন আর্কিটেকচারকে সমর্থন করে, যেখানে মাইক্রোসার্ভিস একে অপরের সাথে মেসেজ বা ইভেন্টের মাধ্যমে যোগাযোগ করে।
  2. Multiple Bindings: একাধিক মেসেজ চ্যানেল (Input/Output) এবং মেসেজ ব্রোকার ব্যবহারের জন্য সমর্থন প্রদান করা হয়।
  3. Transactional Messaging: মেসেজ প্রক্রিয়া করার সময় ট্রানজ্যাকশন সমর্থন প্রদান করা হয়।
  4. BindingProperties: বিভিন্ন মেসেজ ব্রোকার কনফিগারেশন সমর্থন (যেমন RabbitMQ, Kafka, Redis) করা যায়।

    Example (Multiple Binder Config):

    spring:
      cloud:
        stream:
          bindings:
            output:
              destination: my-topic
              binder: kafka
            input:
              destination: my-topic
              binder: rabbit
          binders:
            kafka:
              type: kafka
              environment:
                spring:
                  cloud:
                    stream:
                      kafka:
                        binder:
                          brokers: localhost:9092
            rabbit:
              type: rabbit
              environment:
                spring:
                  cloud:
                    stream:
                      rabbit:
                        binder:
                          host: localhost
    
  5. Error Handling: Spring Cloud Stream সঠিকভাবে মেসেজ ফেইলিওর হ্যান্ডলিং এবং রিট্রাই করার জন্য সমর্থন প্রদান করে।
  6. Spring Cloud Stream with Kafka:
    • Kafka একটি জনপ্রিয় মেসেজ ব্রোকার যা Spring Cloud Stream এর সঙ্গে সুসংহত কাজ করে। এতে Kafka টপিকের মাধ্যমে মেসেজ আদান-প্রদান সহজ হয় এবং ডিস্ট্রিবিউটেড মাইক্রোসার্ভিসগুলির মধ্যে নিরাপদ মেসেজিং হয়।
  7. Spring Cloud Stream with RabbitMQ:
    • RabbitMQ হল আরেকটি জনপ্রিয় মেসেজ ব্রোকার, যা সহজে Spring Cloud Stream-এ যুক্ত করা যায় এবং মেসেজিং সিস্টেম তৈরি করা যায়।

উপসংহার

Spring Cloud Stream মাইক্রোসার্ভিসের মধ্যে মেসেজ আদান-প্রদান ব্যবস্থাপনাকে সহজ করে তোলে এবং এটি Kafka, RabbitMQ সহ বিভিন্ন মেসেজ ব্রোকারের সাথে ইন্টিগ্রেশন সমর্থন করে। এটি message-driven microservices গঠনে ব্যবহৃত হয়, যার ফলে মাইক্রোসার্ভিসগুলি একে অপরের সাথে loosely coupled থাকে এবং সহজেই scalable হয়। Spring Cloud Stream ব্যবহারে মেসেজিং সিস্টেমের স্থায়িত্ব এবং পারফরম্যান্স বৃদ্ধি পায়।

Content added By

Spring Cloud Stream কি এবং এর প্রয়োজনীয়তা

59
59

Spring Cloud Stream হলো একটি মেসেজিং প্যাটার্ন লাইব্রেরি যা event-driven architecture তৈরি করতে সাহায্য করে। এটি Spring Cloud এর একটি অংশ, যা distributed messaging এবং event-driven সিস্টেম তৈরি করার জন্য ডিজাইন করা হয়েছে। Spring Cloud Stream মেসেজ ব্রোকারগুলির সাথে ইন্টিগ্রেট হয়ে অ্যাসিঙ্ক্রোনাস কমিউনিকেশন প্রতিষ্ঠা করতে সাহায্য করে, যেমন: Apache Kafka, RabbitMQ, Amazon Kinesis, ইত্যাদি।

Spring Cloud Stream মাইক্রোসার্ভিস আর্কিটেকচারে বিভিন্ন সার্ভিসের মধ্যে মেসেজ এবং ইভেন্ট প্রপ্যাগেশন সহজ করে। এটি সিস্টেমের মধ্যে কার্যকরী মেসেজিং ব্যবস্থাপনা, স্কেলেবিলিটি এবং পারফরম্যান্স প্রদান করে।

Spring Cloud Stream এর কাজ:

  1. Event-Driven Architecture:
    • Spring Cloud Stream মূলত ইভেন্ট-ড্রিভেন আর্কিটেকচারের জন্য তৈরি। এটি অ্যাসিঙ্ক্রোনাস ইভেন্ট এবং মেসেজিং সিস্টেমে কাজ করে, যেখানে সার্ভিসগুলির মধ্যে ইভেন্ট প্রপ্যাগেশন হয় এবং সার্ভিসগুলো একে অপরকে বার্তা পাঠায়।
  2. Messaging Abstraction:
    • Spring Cloud Stream একাধিক মেসেজ ব্রোকার (যেমন Kafka, RabbitMQ) এর জন্য একটি সাধারণ API প্রদান করে। এটি ডেভেলপারদের জন্য মেসেজিং সিস্টেমের মধ্যে ইন্টিগ্রেশন এবং পরিচালনা সহজ করে।
  3. Message Channels:
    • Spring Cloud Stream ব্যবহারকারীকে Message Channels এর মাধ্যমে মেসেজ পাঠাতে এবং গ্রহণ করতে দেয়। এটি সাধারণত @StreamListener বা @EnableBinding অ্যানোটেশন ব্যবহার করে মেসেজের উপর কাজ করে।
  4. Bindings and Producers/Consumers:
    • Spring Cloud Stream ব্যবহারকারীদের মেসেজ পণ্যকারী (Producer) এবং মেসেজ গ্রহণকারী (Consumer) তৈরি করতে দেয়। প্রতিটি Producer এবং Consumer একটি নির্দিষ্ট "binding" এর মাধ্যমে যোগাযোগ করে।
  5. Processing Events:
    • মেসেজটি গ্রহণ করার পর, Spring Cloud Stream মেসেজটি প্রসেস করতে পারে এবং পরবর্তী স্টেপ হিসেবে অন্য সার্ভিসে পাঠাতে পারে। এইভাবে সার্ভিসগুলির মধ্যে ইভেন্ট প্রপ্যাগেশন এবং কমিউনিকেশন সিস্টেম স্থাপন করা যায়।

Spring Cloud Stream এর প্রয়োজনীয়তা:

১. Event-Driven Architecture Support:

  • অনেক সময় বিভিন্ন সার্ভিসের মধ্যে রিয়েল-টাইম বা অ্যাসিঙ্ক্রোনাস মেসেজিং প্রয়োজন হয়। Spring Cloud Stream সহজে event-driven সিস্টেম তৈরি করতে সাহায্য করে, যেখানে সার্ভিসগুলো একে অপরের সাথে ইভেন্ট শেয়ার করে এবং ডেটা এক্সচেঞ্জ করা হয়।

২. Scalability:

  • Spring Cloud Stream মেসেজ-ভিত্তিক সিস্টেমের মাধ্যমে আর্কিটেকচারের স্কেল বৃদ্ধি করতে সাহায্য করে। এতে একটি সার্ভিসের মধ্যে সৃষ্ট ইভেন্ট বা মেসেজ সরাসরি অন্য সার্ভিসের কাছে পৌঁছানোর জন্য স্কেলযোগ্য মেকানিজম ব্যবহার করা যায়।

৩. Loose Coupling:

  • Spring Cloud Stream এর মাধ্যমে সার্ভিসগুলো একে অপরের থেকে আলাদা থাকে, অর্থাৎ loose coupling নিশ্চিত হয়। এটি একে অপরের সাথে ইভেন্ট শেয়ার করার মাধ্যমে ইন্টারঅ্যাক্ট করে, তবে সরাসরি ডিপেনডেন্ট না হয়ে। এর ফলে সিস্টেমের রিলায়েবিলিটি এবং মেইনটেনেবলিটি উন্নত হয়।

৪. Asynchronous Communication:

  • Spring Cloud Stream অ্যাসিঙ্ক্রোনাস মেসেজিং সিস্টেম সরবরাহ করে। যেখানে মেসেজ গুলি পাঠানো এবং গ্রহণ করা হয় অবিলম্বে না হয়ে একটি নির্দিষ্ট সময় পরে। এটি সার্ভিসগুলির মধ্যে সিস্টেমের কার্যকারিতা বৃদ্ধি করে, বিশেষ করে যখন সার্ভিসগুলির মধ্যে লোড ভারি থাকে।

৫. Simplified Integration with Messaging Systems:

  • Spring Cloud Stream বিভিন্ন মেসেজিং সিস্টেম (যেমন Kafka, RabbitMQ) এর সাথে সহজে ইন্টিগ্রেশন করতে পারে এবং একটি একক API প্রদান করে, যা ডেভেলপারদের জন্য সুবিধাজনক হয়। এর মাধ্যমে মেসেজ ব্রোকার কনফিগারেশন সহজে পরিচালিত হতে পারে।

৬. Fault Tolerance:

  • Spring Cloud Stream ডিস্ট্রিবিউটেড সিস্টেমে ইভেন্ট প্রসেসিং এবং ফেইলওভার মেকানিজম তৈরি করে, যা সিস্টেমের স্থিতিশীলতা নিশ্চিত করে। সার্ভিসের কাজ না করলে অন্য সার্ভিসের মাধ্যমে ইভেন্টটি পাঠানো যায়।

৭. Data Consistency and Eventual Consistency:

  • বিভিন্ন সার্ভিসের মধ্যে ডেটা ইন্টিগ্রিটি এবং কনসিস্টেন্সি বজায় রাখা খুবই গুরুত্বপূর্ণ। Spring Cloud Stream মেসেজিং ব্যবহার করে ডিস্ট্রিবিউটেড সিস্টেমে eventual consistency বজায় রাখতে সাহায্য করে।

Spring Cloud Stream এর কনফিগারেশন উদাহরণ:

Spring Cloud Stream এর একটি সাধারণ উদাহরণ যেখানে Kafka ব্যবহৃত হচ্ছে:

  1. Dependency Configuration (Maven):

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
    
  2. application.yml (Kafka Configuration):

    spring:
      cloud:
        stream:
          bindings:
            input:
              destination: myTopic
              group: myGroup
            output:
              destination: myTopic
          kafka:
            binder:
              brokers: localhost:9092
    
  3. Producer (Message Sender):

    @EnableBinding(Source.class)
    public class MyProducer {
    
        private final MessageChannel output;
    
        public MyProducer(Source source) {
            this.output = source.output();
        }
    
        public void sendMessage(String message) {
            this.output.send(MessageBuilder.withPayload(message).build());
        }
    }
    
  4. Consumer (Message Listener):

    @EnableBinding(Sink.class)
    public class MyConsumer {
    
        @StreamListener(Sink.INPUT)
        public void handleMessage(String message) {
            System.out.println("Received message: " + message);
        }
    }
    
  • এখানে, Producer মেসেজ পাঠাবে myTopic নামে একটি Kafka টপিকে, এবং Consumer এই মেসেজ গ্রহণ করবে।

Spring Cloud Stream এর কিছু ব্যবহার:

  1. Event-Driven Microservices:
    • মাইক্রোসার্ভিসে যখন একটি ইভেন্ট বা ডেটা পরিবর্তন হয়, তখন তা স্বয়ংক্রিয়ভাবে অন্য সার্ভিসে প্রপ্যাগেট হতে পারে।
  2. Asynchronous Data Processing:
    • ডেটা প্রসেসিং অ্যাসিঙ্ক্রোনাসভাবে সম্পন্ন হয়, যেমন একটি সার্ভিস ডেটা প্রক্রিয়া করার পরে অন্য সার্ভিসে পাঠানো।
  3. Real-time Event Streaming:
    • Kafka বা RabbitMQ ব্যবহার করে রিয়েল-টাইম ডেটা স্ট্রিমিং প্রক্রিয়া পরিচালনা করা।
  4. Integration with Big Data Systems:
    • Spring Cloud Stream বড় ডেটা সিস্টেমের সাথে ইন্টিগ্রেশন করতে সাহায্য করে, যেমন Apache Kafka বা Apache Flume

উপসংহার:

Spring Cloud Stream একটি শক্তিশালী ফ্রেমওয়ার্ক যা মাইক্রোসার্ভিস আর্কিটেকচারে event-driven architecture এবং distributed messaging ব্যবস্থাপনা সহজ করে তোলে। এটি মাইক্রোসার্ভিসের মধ্যে ইভেন্ট প্রপ্যাগেশন, মেসেজিং, এবং ডাটা প্রসেসিংকে সহজ করে এবং সিস্টেমের স্কেল, পারফরম্যান্স এবং স্থিতিশীলতা বৃদ্ধি করতে সাহায্য করে।

যদি আরও বিস্তারিত জানতে চান বা প্রশ্ন থাকে, তবে জানাতে পারেন! 😊

Content added By

Apache Kafka এবং RabbitMQ এর সাথে Spring Cloud Stream Integration

84
84

Spring Cloud Stream হল একটি ইভেন্ট-ড্রিভেন এবং মেসেজ-ভিত্তিক অ্যাপ্লিকেশন তৈরি করার জন্য একটি ফ্রেমওয়ার্ক, যা Apache Kafka, RabbitMQ বা অন্যান্য মেসেজিং সিস্টেমের সাথে সহজে ইন্টিগ্রেট হতে পারে। Spring Cloud Stream এই মেসেজিং সিস্টেমগুলোকে ব্যবহার করে মাইক্রোসার্ভিসগুলোর মধ্যে অ্যাসিনক্রোনাস মেসেজ আদান-প্রদান সহজ করে তোলে।

নিচে Apache Kafka এবং RabbitMQ এর সাথে Spring Cloud Stream কিভাবে ইন্টিগ্রেট করা যায়, তা ব্যাখ্যা করা হয়েছে।


Spring Cloud Stream with Apache Kafka Integration

Apache Kafka হল একটি ওপেন সোর্স মেসেজ ব্রোকার, যা স্ট্রিমিং ডেটা এবং লগিং সিস্টেমের জন্য ব্যবহৃত হয়। Spring Cloud Stream Kafka এর সাথে সহজে কাজ করতে সাহায্য করে।

১. Maven ডিপেনডেন্সি

Spring Cloud Stream এবং Kafka এর সাথে ইন্টিগ্রেশন করতে আপনাকে নিচের ডিপেনডেন্সি আপনার pom.xml ফাইলে যোগ করতে হবে:

<dependencies>
    <!-- Spring Cloud Stream Dependency -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
    
    <!-- Spring Boot Starter Web (if needed) -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <!-- Spring Boot Starter Messaging -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
</dependencies>

২. application.yml কনফিগারেশন

Kafka এর সাথে Spring Cloud Stream কাজ করতে কিছু কনফিগারেশন সেটআপ করতে হবে। আপনার application.yml ফাইলে Kafka এর সাথে ইন্টিগ্রেশন কনফিগার করুন।

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: my_topic
          group: my-group
          content-type: application/json
        output:
          destination: my_topic
          content-type: application/json
      kafka:
        binder:
          brokers: localhost:9092  # Kafka broker URL
  • input এবং output হলো Stream এর ইনপুট এবং আউটপুট চ্যানেল।
  • my_topic হল Kafka টপিকের নাম যেখানে মেসেজ পাঠানো হবে এবং গ্রহণ করা হবে।

৩. Stream Listener এবং Source তৈরি করা

Spring Cloud Stream-এর মাধ্যমে আপনি একটি Source (message producer) এবং Sink (message consumer) তৈরি করতে পারেন। এখানে @StreamListener ব্যবহার করে মেসেজ পাঠানো এবং গ্রহণ করা হয়েছে।

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Service;

@EnableBinding({Source.class, Sink.class})
@Service
public class KafkaStreamService {

    // Message Producer
    @StreamListener(Source.OUTPUT)
    @SendTo(Source.OUTPUT)  // Sending back the message
    public String processMessage(String message) {
        System.out.println("Received message: " + message);
        return "Processed: " + message;
    }

    // Message Consumer
    @StreamListener(Sink.INPUT)
    public void consumeMessage(String message) {
        System.out.println("Consumed message: " + message);
    }
}

এই উদাহরণে, @StreamListener ব্যবহার করে আপনি Kafka টপিক থেকে মেসেজ গ্রহণ এবং পাঠাতে পারবেন।

৪. Kafka Topic Consumer & Producer তৈরি

  • Producer (Message Sender): মেসেজ প্রেরণের জন্য Source ব্যবহার করা হবে।
  • Consumer (Message Receiver): মেসেজ গ্রহণের জন্য Sink ব্যবহার করা হবে।

৫. Kafka Producer (Message Sender)

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    @Autowired
    private MessageChannel output;  // output is the channel to send messages to Kafka

    public void sendMessage(String message) {
        output.send(MessageBuilder.withPayload(message).build());
    }
}

৬. Kafka Consumer (Message Receiver)

import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @StreamListener(Sink.INPUT)
    public void consume(String message) {
        System.out.println("Consumed message: " + message);
    }
}

Spring Cloud Stream with RabbitMQ Integration

RabbitMQ একটি জনপ্রিয় মেসেজ ব্রোকার, যা জটিল মেসেজিং সিস্টেমের জন্য ব্যবহৃত হয়। Spring Cloud Stream RabbitMQ এর সাথে সহজে ইন্টিগ্রেট করা যায়।

১. Maven ডিপেনডেন্সি

RabbitMQ-এর সাথে Spring Cloud Stream ইন্টিগ্রেশন করতে নিচের ডিপেনডেন্সি আপনার pom.xml ফাইলে যোগ করতে হবে:

<dependencies>
    <!-- Spring Cloud Stream RabbitMQ Dependency -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    
    <!-- Spring Boot Starter Web (if needed) -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>

২. application.yml কনফিগারেশন

RabbitMQ-এর সাথে Spring Cloud Stream ইন্টিগ্রেশন করতে নিচের কনফিগারেশন ফাইলটি যোগ করুন:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: my_queue
          group: my-group
          content-type: application/json
        output:
          destination: my_queue
          content-type: application/json
      rabbit:
        binder:
          brokers: localhost:5672  # RabbitMQ broker URL

এখানে, input এবং output RabbitMQ-এর সাথে যোগাযোগের জন্য কনফিগার করা হয়েছে। my_queue RabbitMQ-তে একটি কিউ থাকবে যেখানে মেসেজ পাঠানো এবং গ্রহণ করা হবে।

৩. Stream Listener এবং Source তৈরি করা

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Service;

@EnableBinding({Source.class, Sink.class})
@Service
public class RabbitStreamService {

    // Message Producer
    @StreamListener(Source.OUTPUT)
    @SendTo(Source.OUTPUT)  // Sending back the message
    public String processMessage(String message) {
        System.out.println("Received message: " + message);
        return "Processed: " + message;
    }

    // Message Consumer
    @StreamListener(Sink.INPUT)
    public void consumeMessage(String message) {
        System.out.println("Consumed message: " + message);
    }
}

Spring Cloud Stream Integration with Kafka এবং RabbitMQ-এর উপসংহার

  • Spring Cloud Stream সহজেই Kafka এবং RabbitMQ এর সাথে ইন্টিগ্রেট করা যায়।
  • আপনি Spring Cloud Stream ব্যবহার করে মেসেজ প্রযোজক (Producer) এবং মেসেজ গ্রহণকারী (Consumer) তৈরি করতে পারেন।
  • Apache Kafka বা RabbitMQ ব্যবহারের মাধ্যমে আপনি মাইক্রোসার্ভিস আর্কিটেকচারে অ্যাসিনক্রোনাস কমিউনিকেশন করতে পারবেন, যা পারফরম্যান্স বৃদ্ধি এবং সিস্টেমের স্কেলেবিলিটি নিশ্চিত করে।

এই ইন্টিগ্রেশন আপনাকে মাইক্রোসার্ভিস অ্যাপ্লিকেশনে ইভেন্ট-ড্রিভেন আর্কিটেকচার এবং মেসেজিং সিস্টেমের ব্যবহার সহজ করবে।

Content added By

Message Channel, Binder এবং Processor এর ধারণা

67
67

Spring Cloud Stream Overview

Spring Cloud Stream হলো একটি প্রোগ্রামিং মডেল যা মেসেজ-চালিত অ্যাপ্লিকেশন তৈরি করতে সাহায্য করে। এটি Spring Integration এবং Spring Cloud Stream এর সাহায্যে বিভিন্ন মেসেজ ব্রোকারের (যেমন Kafka, RabbitMQ) মাধ্যমে ডেটা স্ট্রিম করতে সক্ষম করে। Spring Cloud Stream, Spring Boot অ্যাপ্লিকেশনের মধ্যে Message Channels, Binders, এবং Processors ব্যবহার করে মেসেজ প্রেরণ এবং গ্রহণের কার্যপ্রণালী সহজ করে তোলে।

এই তিনটি মৌলিক ধারণা, Message Channel, Binder, এবং Processor, Spring Cloud Stream এর মূল কনসেপ্ট। এগুলির সাহায্যে আপনি মেসেজিং সিস্টেম এবং ডেটা স্ট্রিম প্রসেসিং সিস্টেম নির্মাণ করতে পারবেন।


১. Message Channel

Message Channel একটি অবজেক্ট যা একটি ডেটা স্ট্রিম (বা মেসেজ) প্রেরণ বা গ্রহণ করতে ব্যবহৃত হয়। এটি একটি অ্যাবস্ট্রাকশন লেয়ার হিসেবে কাজ করে, যা Spring Integration এর মধ্যে মেসেজ সিস্টেমের মধ্যে যোগাযোগের মাধ্যম হিসেবে কাজ করে।

Spring Cloud Stream-এ, Message Channel এর মাধ্যমে একটি মেসেজ এক স্থান থেকে অন্য স্থানে প্রেরণ করা হয়। Channel দুটি প্রধান কাজ করে:

  1. Input Channels - যেখানে মেসেজ গ্রহণ করা হয়।
  2. Output Channels - যেখানে মেসেজ প্রেরণ করা হয়।

উদাহরণ:

@Bean
public MessageChannel output() {
    return new DirectChannel();
}

@Bean
public MessageChannel input() {
    return new DirectChannel();
}

এখানে output() এবং input() মেসেজ চ্যানেল হিসেবে ব্যবহৃত হচ্ছে। output() চ্যানেল মেসেজ প্রেরণ করবে, এবং input() চ্যানেল মেসেজ গ্রহণ করবে।

২. Binder

Binder হলো একটি অ্যাবস্ট্রাকশন লেয়ার যা মেসেজ চ্যানেলকে একটি মেসেজ ব্রোকারের সাথে সংযুক্ত করে। Spring Cloud Stream-এর মধ্যে Binder একটি ক্লাস যা মেসেজ চ্যানেলগুলিকে একটি মেসেজ ব্রোকার (যেমন Kafka, RabbitMQ) এর সাথে কানেক্ট করে।

Binder দুইটি কাজ করে:

  1. মেসেজ চ্যানেলগুলোকে নির্দিষ্ট মেসেজ ব্রোকারের সাথে যুক্ত করা।
  2. চ্যানেলগুলির মাধ্যমে ডেটা পাঠানো এবং গ্রহণ করা।

Spring Cloud Stream বিভিন্ন মেসেজ ব্রোকার সাপোর্ট করে এবং Binder সেই ব্রোকারের সঙ্গে ইন্টিগ্রেটেড হয়ে কাজ করে। যেমন Kafka, RabbitMQ ইত্যাদি।

উদাহরণ:

@EnableBinding(Source.class) // Source class থেকে Output Channel দ্বারা Binder অটোমেটিকভাবে ব্যাবহার করা হয়
public class MessageProducer {

    @Autowired
    private MessageChannel output;

    public void sendMessage(String message) {
        output.send(MessageBuilder.withPayload(message).build());
    }
}

এখানে, Source.class Spring Cloud Stream এর একটি ইন্টারফেস যা output চ্যানেল তৈরি করবে। Binder এটি Kafka বা RabbitMQ এর সাথে সংযুক্ত করবে।

Kafka Binder Configuration Example:

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: my-topic
          binder: kafka
      binders:
        kafka:
          type: kafka
          environment:
            spring:
              kafka:
                bootstrap-servers: localhost:9092

এখানে, spring.cloud.stream.bindings.output.destination এর মাধ্যমে my-topic নামক একটি Kafka টপিকের সাথে মেসেজ চ্যানেল যুক্ত করা হয়েছে।

৩. Processor

Processor হলো Spring Cloud Stream এর একটি কাস্টম ইন্টারফেস যা মেসেজ প্রক্রিয়া করার জন্য ব্যবহৃত হয়। একটি Processor হলো একটি @StreamListener এর মতো, কিন্তু এটি মেসেজকে একটি চ্যানেল থেকে নিয়ে অন্য একটি চ্যানেলে প্রেরণ করে। এতে ইনপুট এবং আউটপুট চ্যানেল থাকে, যা মেসেজ প্রসেসিং এবং রাউটিং করতে ব্যবহৃত হয়।

Spring Cloud Stream-এ Processor অ্যাপ্লিকেশন তৈরি করার মাধ্যমে আপনি সহজেই ইনপুট মেসেজ গ্রহণ করে সেগুলিকে আউটপুট চ্যানেলে প্রেরণ করতে পারেন।

উদাহরণ:

@EnableBinding(Processor.class)  // Processor ইন্টারফেসটি ইনপুট এবং আউটপুট চ্যানেলকে ব্যাবহার করে
public class MessageProcessor {

    @StreamListener(Processor.INPUT)   // INPUT চ্যানেল থেকে মেসেজ গ্রহণ
    @SendTo(Processor.OUTPUT)          // OUTPUT চ্যানেলে মেসেজ পাঠানো
    public String processMessage(String message) {
        return "Processed: " + message; // মেসেজ প্রসেস করা
    }
}

এখানে, Processor.INPUT চ্যানেল থেকে মেসেজ গ্রহণ করা হচ্ছে এবং Processor.OUTPUT চ্যানেল দিয়ে প্রসেসড মেসেজ আউটপুট হিসেবে প্রেরণ করা হচ্ছে।

Spring Cloud Stream Example with Kafka

Spring Cloud Stream এর সাথে Kafka ব্যবহার করার একটি উদাহরণ দেখানো হল।

১. Maven Dependency

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

এখানে spring-cloud-starter-stream-kafka ডিপেনডেন্সি Kafka Binder ব্যবহার করার জন্য প্রয়োজনীয় কনফিগারেশন সরবরাহ করে।

২. application.yml Configuration

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: input-topic
          group: my-group
        output:
          destination: output-topic
      binder:
        kafka:
          type: kafka
          environment:
            spring:
              kafka:
                bootstrap-servers: localhost:9092

এখানে, input-topic এবং output-topic দুটি Kafka টপিকের সাথে মেসেজ চ্যানেল যুক্ত করা হয়েছে। spring.kafka.bootstrap-servers দিয়ে Kafka সার্ভারের অবস্থান নির্ধারণ করা হয়েছে।

৩. StreamListener and Processor

@EnableBinding(Processor.class)
public class KafkaProcessor {

    @StreamListener(Processor.INPUT)   // ইনপুট চ্যানেল থেকে মেসেজ গ্রহণ
    @SendTo(Processor.OUTPUT)          // আউটপুট চ্যানেলে মেসেজ প্রেরণ
    public String process(String message) {
        return "Processed Message: " + message;
    }
}

এখানে, ইনপুট চ্যানেল থেকে মেসেজ গ্রহণ করা হচ্ছে এবং আউটপুট চ্যানেলে প্রক্রিয়াকৃত মেসেজ পাঠানো হচ্ছে।


সারাংশ

Spring Cloud Stream এর মাধ্যমে মেসেজ-চালিত অ্যাপ্লিকেশন তৈরি করা যায়, যেখানে Message Channel, Binder, এবং Processor হলো মূল উপাদান।

  1. Message Channel - এটি মেসেজ প্রেরণ এবং গ্রহণ করার জন্য ব্যবহৃত হয়।
  2. Binder - এটি মেসেজ চ্যানেলকে মেসেজ ব্রোকারের সাথে যুক্ত করার কাজ করে।
  3. Processor - এটি ইনপুট মেসেজ প্রক্রিয়া করে আউটপুট চ্যানেলে পাঠানোর কাজ করে।

Spring Cloud Stream এবং Kafka বা RabbitMQ এর মতো মেসেজ ব্রোকার ব্যবহার করে ডিস্ট্রিবিউটেড সিস্টেমে মেসেজিং এবং স্ট্রিম প্রসেসিং সহজে করা যায়, যা মাইক্রোসার্ভিস আর্কিটেকচারে কার্যকরী সমাধান প্রদান করে।

Content added By

উদাহরণ সহ Spring Cloud Stream ব্যবহার

60
60

Spring Cloud Stream হল একটি মেসেজিং ও ইভেন্ট-ড্রিভেন মাইক্রোসার্ভিস আর্কিটেকচারের জন্য ব্যবহৃত ফ্রেমওয়ার্ক। এটি Apache Kafka, RabbitMQ, এবং অন্যান্য মেসেজিং সিস্টেমের সঙ্গে ইন্টিগ্রেটেড মেসেজিং সাপোর্ট প্রদান করে। Spring Cloud Stream ইভেন্ট-ড্রিভেন আর্কিটেকচারে producer এবং consumer মডেল ব্যবহার করে, যেখানে producer মেসেজ পাঠায় এবং consumer সেই মেসেজ গ্রহণ করে।

এখানে Spring Cloud Stream ব্যবহার করার জন্য একটি উদাহরণ দেওয়া হচ্ছে, যেখানে producer এবং consumer মাইক্রোসার্ভিস তৈরি করা হবে এবং একটি মেসেজ কিউ (যেমন RabbitMQ বা Kafka) এর মাধ্যমে মেসেজ প্রেরণ ও গ্রহণ করা হবে।


Spring Cloud Stream উদাহরণ

আমরা দুটি অ্যাপ্লিকেশন তৈরি করবো:

  1. Producer Service – এটি একটি মেসেজ প্রেরণ করবে।
  2. Consumer Service – এটি মেসেজ গ্রহণ করবে এবং প্রসেস করবে।

এখানে, আমরা RabbitMQ ব্যবহার করব মেসেজ ব্রোকার হিসেবে।


১. Maven Dependencies

Producer Service - Maven Dependency:

Producer সার্ভিসে Spring Cloud Stream এবং RabbitMQ কনফিগারেশন করতে হবে।

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

Consumer Service - Maven Dependency:

Consumer সার্ভিসের জন্যও একইভাবে Spring Cloud Stream এবং RabbitMQ কনফিগারেশন করতে হবে।

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

২. Producer Service - তৈরি করা

Producer সার্ভিস একটি মেসেজ পাঠাবে RabbitMQ এ।

Producer Service - application.yml:

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: messages
          content-type: application/json
      rabbit:
        binder:
          nodes: localhost

Producer Service - Message Producer:

Producer Service এ একটি মেসেজ তৈরি এবং পাঠানোর জন্য একটি service class তৈরি করা হবে।

import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageProducerController {

    private final StreamBridge streamBridge;

    public MessageProducerController(StreamBridge streamBridge) {
        this.streamBridge = streamBridge;
    }

    @GetMapping("/sendMessage")
    public String sendMessage() {
        String message = "Hello from Producer Service!";
        streamBridge.send("output", message);
        return "Message Sent: " + message;
    }
}
  • এখানে, StreamBridge ব্যবহার করা হচ্ছে যেটি Spring Cloud Stream এর মাধ্যমে মেসেজ প্রেরণ করে।
  • output হল মেসেজ সেন্টারে (RabbitMQ Queue) কনফিগার করা ডেস্টিনেশন।

৩. Consumer Service - তৈরি করা

Consumer সার্ভিস মেসেজ গ্রহণ করবে এবং প্রসেস করবে।

Consumer Service - application.yml:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: messages
          content-type: application/json
      rabbit:
        binder:
          nodes: localhost

Consumer Service - Message Consumer:

Consumer Service এ একটি listener তৈরি করা হবে, যা RabbitMQ থেকে মেসেজ গ্রহণ করবে এবং তা প্রসেস করবে।

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Service;

@EnableBinding(Sink.class)
@Service
public class MessageConsumerService implements MessageHandler {

    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        System.out.println("Received message: " + message.getPayload());
    }
}
  • এখানে, Sink হল একটি Spring Cloud Stream @EnableBinding ইনটারফেস যা ইনপুট চ্যানেল থেকে মেসেজ গ্রহণ করে।
  • handleMessage() মেথডটি RabbitMQ থেকে আসা মেসেজ গ্রহণ এবং প্রসেস করে।

৪. RabbitMQ Configuration

এখানে, আমরা RabbitMQ ব্যবহার করছি মেসেজ ব্রোকার হিসেবে। নিশ্চিত করুন যে RabbitMQ আপনার মেশিনে চলমান আছে বা আপনি RabbitMQ Docker কনটেইনার ব্যবহার করছেন।

RabbitMQ Docker (যদি প্রয়োজন হয়):

docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management

এই কমান্ডটি RabbitMQ সার্ভার এবং ম্যানেজমেন্ট কনসোল চালু করবে। আপনি http://localhost:15672 এ RabbitMQ কনসোল অ্যাক্সেস করতে পারবেন।


৫. Application চালানো

  1. প্রথমে Consumer Service চালু করুন:

    mvn spring-boot:run -Dspring-boot.run.profiles=consumer
    
  2. তারপর Producer Service চালু করুন:

    mvn spring-boot:run -Dspring-boot.run.profiles=producer
    

৬. মেসেজ পাঠানো এবং গ্রহণ করা

  1. Producer Service থেকে /sendMessage পাথ রিকোয়েস্ট পাঠিয়ে মেসেজ পাঠান:

    GET http://localhost:8081/sendMessage
    
  2. মেসেজটি RabbitMQ কিউতে পৌঁছানোর পর Consumer Service এটি গ্রহণ করবে এবং কনসোল এ প্রদর্শিত হবে:

    Received message: Hello from Producer Service!
    

Conclusion

এভাবে, Spring Cloud Stream ব্যবহার করে আপনি RabbitMQ এর মাধ্যমে মেসেজ প্রেরণ এবং গ্রহণ করতে পারেন। এই উদাহরণে, Producer মাইক্রোসার্ভিস মেসেজ পাঠাচ্ছে এবং Consumer মাইক্রোসার্ভিস সেটি গ্রহণ করছে। আপনি এটি Kafka বা অন্য মেসেজ ব্রোকারের সাথে ইন্টিগ্রেট করেও ব্যবহার করতে পারেন। Spring Cloud Stream সহজে ইভেন্ট-ড্রিভেন মাইক্রোসার্ভিস আর্কিটেকচার তৈরি করতে সহায়ক।

Content added By
টপ রেটেড অ্যাপ

স্যাট অ্যাকাডেমী অ্যাপ

আমাদের অল-ইন-ওয়ান মোবাইল অ্যাপের মাধ্যমে সীমাহীন শেখার সুযোগ উপভোগ করুন।

ভিডিও
লাইভ ক্লাস
এক্সাম
ডাউনলোড করুন
Promotion