Spring Cloud Stream হল একটি হাই-এবস্ট্রাক্ট, ইভেন্ট-ড্রিভেন এবং মেসেজ-বেসড মাইক্রোসার্ভিস আর্কিটেকচার তৈরি করার জন্য ব্যবহৃত একটি ফ্রেমওয়ার্ক। এটি মাইক্রোসার্ভিসের মধ্যে এসিনক্রোনাস মেসেজ কমিউনিকেশন সমর্থন করে এবং বিভিন্ন মেসেজ ব্রোকার (যেমন RabbitMQ, Kafka) এর সাথে ইন্টিগ্রেটেড হয়। Spring Cloud Stream সার্ভিসগুলোকে আলাদা ও স্বতন্ত্রভাবে কাজ করতে সহায়ক করে এবং যোগাযোগের জন্য মেসেজ কিউ ব্যবহার করে।
প্রথমে Spring Cloud Stream এবং Kafka/RabbitMQ ডিপেন্ডেন্সি আপনার pom.xml
ফাইলে যোগ করুন।
Example (Kafka):
<dependencies>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Cloud Stream with Kafka Binder -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<!-- Spring Cloud Stream Binder Dependency for Kafka -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
</dependencies>
Example (RabbitMQ):
<dependencies>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Cloud Stream with RabbitMQ Binder -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!-- Spring Cloud Stream Binder Dependency for RabbitMQ -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
</dependencies>
application.yml
বা application.properties
ফাইলে Spring Cloud Stream কনফিগারেশন যোগ করুন। এখানে Kafka অথবা RabbitMQ ব্যবহার করা যেতে পারে। নিচে Kafka এর কনফিগারেশন দেখানো হয়েছে।
application.yml (Kafka Configuration):
spring:
cloud:
stream:
bindings:
output:
destination: my-topic
input:
destination: my-topic
kafka:
binder:
brokers: localhost:9092 # Kafka broker address
consumer:
group: my-group
এখানে:
output
: মেসেজ আউটপুট সেলুলার চ্যানেল।input
: মেসেজ ইনপুট সেলুলার চ্যানেল।my-topic
: Kafka টপিকের নাম।brokers
: Kafka ব্রোকারের ঠিকানা।Spring Cloud Stream ব্যবহার করে আপনি মেসেজ প্রেরণ করতে পারেন। এই জন্য আপনি MessageChannel ব্যবহার করতে পারেন। নিচে একটি message producer উদাহরণ দেখানো হলো।
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
@EnableBinding(Source.class) // Binding to the output channel
public class MessageProducer {
private final Source source;
public MessageProducer(Source source) {
this.source = source;
}
public void sendMessage(String message) {
source.output().send(MessageBuilder.withPayload(message).build()); // Sending message to the output channel
}
}
এখানে, Source
ক্লাস ব্যবহার করা হয়েছে যা output channel কে প্রোস্ট করা হয়েছে। মেসেজ প্রেরণের জন্য MessageBuilder
ব্যবহার করা হয়েছে।
Message Consumer দ্বারা মেসেজ গ্রহণ করা হয়। এখানে MessageChannel ব্যবহার করে ইনপুট চ্যানেল থেকে মেসেজ গ্রহণ করা হচ্ছে।
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
@Service
@EnableBinding(Source.class) // Binding to input channel
public class MessageConsumer {
@StreamListener(target = Source.INPUT) // Listening to input channel
public void handleMessage(String message) {
System.out.println("Received message: " + message); // Process the received message
}
}
এখানে, @StreamListener
অ্যানোটেশন ব্যবহার করা হয়েছে, যা input channel থেকে মেসেজ গ্রহণ করে এবং প্রসেস করে।
BindingProperties: বিভিন্ন মেসেজ ব্রোকার কনফিগারেশন সমর্থন (যেমন RabbitMQ, Kafka, Redis) করা যায়।
Example (Multiple Binder Config):
spring:
cloud:
stream:
bindings:
output:
destination: my-topic
binder: kafka
input:
destination: my-topic
binder: rabbit
binders:
kafka:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
rabbit:
type: rabbit
environment:
spring:
cloud:
stream:
rabbit:
binder:
host: localhost
Spring Cloud Stream মাইক্রোসার্ভিসের মধ্যে মেসেজ আদান-প্রদান ব্যবস্থাপনাকে সহজ করে তোলে এবং এটি Kafka, RabbitMQ সহ বিভিন্ন মেসেজ ব্রোকারের সাথে ইন্টিগ্রেশন সমর্থন করে। এটি message-driven microservices গঠনে ব্যবহৃত হয়, যার ফলে মাইক্রোসার্ভিসগুলি একে অপরের সাথে loosely coupled থাকে এবং সহজেই scalable হয়। Spring Cloud Stream ব্যবহারে মেসেজিং সিস্টেমের স্থায়িত্ব এবং পারফরম্যান্স বৃদ্ধি পায়।
Spring Cloud Stream হলো একটি মেসেজিং প্যাটার্ন লাইব্রেরি যা event-driven architecture তৈরি করতে সাহায্য করে। এটি Spring Cloud এর একটি অংশ, যা distributed messaging এবং event-driven সিস্টেম তৈরি করার জন্য ডিজাইন করা হয়েছে। Spring Cloud Stream মেসেজ ব্রোকারগুলির সাথে ইন্টিগ্রেট হয়ে অ্যাসিঙ্ক্রোনাস কমিউনিকেশন প্রতিষ্ঠা করতে সাহায্য করে, যেমন: Apache Kafka, RabbitMQ, Amazon Kinesis, ইত্যাদি।
Spring Cloud Stream মাইক্রোসার্ভিস আর্কিটেকচারে বিভিন্ন সার্ভিসের মধ্যে মেসেজ এবং ইভেন্ট প্রপ্যাগেশন সহজ করে। এটি সিস্টেমের মধ্যে কার্যকরী মেসেজিং ব্যবস্থাপনা, স্কেলেবিলিটি এবং পারফরম্যান্স প্রদান করে।
@StreamListener
বা @EnableBinding
অ্যানোটেশন ব্যবহার করে মেসেজের উপর কাজ করে।Spring Cloud Stream এর একটি সাধারণ উদাহরণ যেখানে Kafka ব্যবহৃত হচ্ছে:
Dependency Configuration (Maven):
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
application.yml (Kafka Configuration):
spring:
cloud:
stream:
bindings:
input:
destination: myTopic
group: myGroup
output:
destination: myTopic
kafka:
binder:
brokers: localhost:9092
Producer (Message Sender):
@EnableBinding(Source.class)
public class MyProducer {
private final MessageChannel output;
public MyProducer(Source source) {
this.output = source.output();
}
public void sendMessage(String message) {
this.output.send(MessageBuilder.withPayload(message).build());
}
}
Consumer (Message Listener):
@EnableBinding(Sink.class)
public class MyConsumer {
@StreamListener(Sink.INPUT)
public void handleMessage(String message) {
System.out.println("Received message: " + message);
}
}
myTopic
নামে একটি Kafka টপিকে, এবং Consumer এই মেসেজ গ্রহণ করবে।Spring Cloud Stream একটি শক্তিশালী ফ্রেমওয়ার্ক যা মাইক্রোসার্ভিস আর্কিটেকচারে event-driven architecture এবং distributed messaging ব্যবস্থাপনা সহজ করে তোলে। এটি মাইক্রোসার্ভিসের মধ্যে ইভেন্ট প্রপ্যাগেশন, মেসেজিং, এবং ডাটা প্রসেসিংকে সহজ করে এবং সিস্টেমের স্কেল, পারফরম্যান্স এবং স্থিতিশীলতা বৃদ্ধি করতে সাহায্য করে।
যদি আরও বিস্তারিত জানতে চান বা প্রশ্ন থাকে, তবে জানাতে পারেন! 😊
Spring Cloud Stream হল একটি ইভেন্ট-ড্রিভেন এবং মেসেজ-ভিত্তিক অ্যাপ্লিকেশন তৈরি করার জন্য একটি ফ্রেমওয়ার্ক, যা Apache Kafka, RabbitMQ বা অন্যান্য মেসেজিং সিস্টেমের সাথে সহজে ইন্টিগ্রেট হতে পারে। Spring Cloud Stream এই মেসেজিং সিস্টেমগুলোকে ব্যবহার করে মাইক্রোসার্ভিসগুলোর মধ্যে অ্যাসিনক্রোনাস মেসেজ আদান-প্রদান সহজ করে তোলে।
নিচে Apache Kafka এবং RabbitMQ এর সাথে Spring Cloud Stream কিভাবে ইন্টিগ্রেট করা যায়, তা ব্যাখ্যা করা হয়েছে।
Apache Kafka হল একটি ওপেন সোর্স মেসেজ ব্রোকার, যা স্ট্রিমিং ডেটা এবং লগিং সিস্টেমের জন্য ব্যবহৃত হয়। Spring Cloud Stream Kafka এর সাথে সহজে কাজ করতে সাহায্য করে।
Spring Cloud Stream এবং Kafka এর সাথে ইন্টিগ্রেশন করতে আপনাকে নিচের ডিপেনডেন্সি আপনার pom.xml
ফাইলে যোগ করতে হবে:
<dependencies>
<!-- Spring Cloud Stream Dependency -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<!-- Spring Boot Starter Web (if needed) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Boot Starter Messaging -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
</dependencies>
application.yml
কনফিগারেশনKafka এর সাথে Spring Cloud Stream কাজ করতে কিছু কনফিগারেশন সেটআপ করতে হবে। আপনার application.yml
ফাইলে Kafka এর সাথে ইন্টিগ্রেশন কনফিগার করুন।
spring:
cloud:
stream:
bindings:
input:
destination: my_topic
group: my-group
content-type: application/json
output:
destination: my_topic
content-type: application/json
kafka:
binder:
brokers: localhost:9092 # Kafka broker URL
input
এবং output
হলো Stream এর ইনপুট এবং আউটপুট চ্যানেল।my_topic
হল Kafka টপিকের নাম যেখানে মেসেজ পাঠানো হবে এবং গ্রহণ করা হবে।Spring Cloud Stream-এর মাধ্যমে আপনি একটি Source (message producer) এবং Sink (message consumer) তৈরি করতে পারেন। এখানে @StreamListener ব্যবহার করে মেসেজ পাঠানো এবং গ্রহণ করা হয়েছে।
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Service;
@EnableBinding({Source.class, Sink.class})
@Service
public class KafkaStreamService {
// Message Producer
@StreamListener(Source.OUTPUT)
@SendTo(Source.OUTPUT) // Sending back the message
public String processMessage(String message) {
System.out.println("Received message: " + message);
return "Processed: " + message;
}
// Message Consumer
@StreamListener(Sink.INPUT)
public void consumeMessage(String message) {
System.out.println("Consumed message: " + message);
}
}
এই উদাহরণে, @StreamListener ব্যবহার করে আপনি Kafka টপিক থেকে মেসেজ গ্রহণ এবং পাঠাতে পারবেন।
Source
ব্যবহার করা হবে।Sink
ব্যবহার করা হবে।import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
@Autowired
private MessageChannel output; // output is the channel to send messages to Kafka
public void sendMessage(String message) {
output.send(MessageBuilder.withPayload(message).build());
}
}
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@StreamListener(Sink.INPUT)
public void consume(String message) {
System.out.println("Consumed message: " + message);
}
}
RabbitMQ একটি জনপ্রিয় মেসেজ ব্রোকার, যা জটিল মেসেজিং সিস্টেমের জন্য ব্যবহৃত হয়। Spring Cloud Stream RabbitMQ এর সাথে সহজে ইন্টিগ্রেট করা যায়।
RabbitMQ-এর সাথে Spring Cloud Stream ইন্টিগ্রেশন করতে নিচের ডিপেনডেন্সি আপনার pom.xml
ফাইলে যোগ করতে হবে:
<dependencies>
<!-- Spring Cloud Stream RabbitMQ Dependency -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!-- Spring Boot Starter Web (if needed) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
application.yml
কনফিগারেশনRabbitMQ-এর সাথে Spring Cloud Stream ইন্টিগ্রেশন করতে নিচের কনফিগারেশন ফাইলটি যোগ করুন:
spring:
cloud:
stream:
bindings:
input:
destination: my_queue
group: my-group
content-type: application/json
output:
destination: my_queue
content-type: application/json
rabbit:
binder:
brokers: localhost:5672 # RabbitMQ broker URL
এখানে, input
এবং output
RabbitMQ-এর সাথে যোগাযোগের জন্য কনফিগার করা হয়েছে। my_queue
RabbitMQ-তে একটি কিউ থাকবে যেখানে মেসেজ পাঠানো এবং গ্রহণ করা হবে।
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Service;
@EnableBinding({Source.class, Sink.class})
@Service
public class RabbitStreamService {
// Message Producer
@StreamListener(Source.OUTPUT)
@SendTo(Source.OUTPUT) // Sending back the message
public String processMessage(String message) {
System.out.println("Received message: " + message);
return "Processed: " + message;
}
// Message Consumer
@StreamListener(Sink.INPUT)
public void consumeMessage(String message) {
System.out.println("Consumed message: " + message);
}
}
এই ইন্টিগ্রেশন আপনাকে মাইক্রোসার্ভিস অ্যাপ্লিকেশনে ইভেন্ট-ড্রিভেন আর্কিটেকচার এবং মেসেজিং সিস্টেমের ব্যবহার সহজ করবে।
Spring Cloud Stream হলো একটি প্রোগ্রামিং মডেল যা মেসেজ-চালিত অ্যাপ্লিকেশন তৈরি করতে সাহায্য করে। এটি Spring Integration এবং Spring Cloud Stream এর সাহায্যে বিভিন্ন মেসেজ ব্রোকারের (যেমন Kafka, RabbitMQ) মাধ্যমে ডেটা স্ট্রিম করতে সক্ষম করে। Spring Cloud Stream, Spring Boot অ্যাপ্লিকেশনের মধ্যে Message Channels, Binders, এবং Processors ব্যবহার করে মেসেজ প্রেরণ এবং গ্রহণের কার্যপ্রণালী সহজ করে তোলে।
এই তিনটি মৌলিক ধারণা, Message Channel, Binder, এবং Processor, Spring Cloud Stream এর মূল কনসেপ্ট। এগুলির সাহায্যে আপনি মেসেজিং সিস্টেম এবং ডেটা স্ট্রিম প্রসেসিং সিস্টেম নির্মাণ করতে পারবেন।
Message Channel একটি অবজেক্ট যা একটি ডেটা স্ট্রিম (বা মেসেজ) প্রেরণ বা গ্রহণ করতে ব্যবহৃত হয়। এটি একটি অ্যাবস্ট্রাকশন লেয়ার হিসেবে কাজ করে, যা Spring Integration এর মধ্যে মেসেজ সিস্টেমের মধ্যে যোগাযোগের মাধ্যম হিসেবে কাজ করে।
Spring Cloud Stream-এ, Message Channel এর মাধ্যমে একটি মেসেজ এক স্থান থেকে অন্য স্থানে প্রেরণ করা হয়। Channel দুটি প্রধান কাজ করে:
উদাহরণ:
@Bean
public MessageChannel output() {
return new DirectChannel();
}
@Bean
public MessageChannel input() {
return new DirectChannel();
}
এখানে output()
এবং input()
মেসেজ চ্যানেল হিসেবে ব্যবহৃত হচ্ছে। output()
চ্যানেল মেসেজ প্রেরণ করবে, এবং input()
চ্যানেল মেসেজ গ্রহণ করবে।
Binder হলো একটি অ্যাবস্ট্রাকশন লেয়ার যা মেসেজ চ্যানেলকে একটি মেসেজ ব্রোকারের সাথে সংযুক্ত করে। Spring Cloud Stream-এর মধ্যে Binder একটি ক্লাস যা মেসেজ চ্যানেলগুলিকে একটি মেসেজ ব্রোকার (যেমন Kafka, RabbitMQ) এর সাথে কানেক্ট করে।
Binder দুইটি কাজ করে:
Spring Cloud Stream বিভিন্ন মেসেজ ব্রোকার সাপোর্ট করে এবং Binder সেই ব্রোকারের সঙ্গে ইন্টিগ্রেটেড হয়ে কাজ করে। যেমন Kafka, RabbitMQ ইত্যাদি।
উদাহরণ:
@EnableBinding(Source.class) // Source class থেকে Output Channel দ্বারা Binder অটোমেটিকভাবে ব্যাবহার করা হয়
public class MessageProducer {
@Autowired
private MessageChannel output;
public void sendMessage(String message) {
output.send(MessageBuilder.withPayload(message).build());
}
}
এখানে, Source.class
Spring Cloud Stream এর একটি ইন্টারফেস যা output
চ্যানেল তৈরি করবে। Binder এটি Kafka বা RabbitMQ এর সাথে সংযুক্ত করবে।
Kafka Binder Configuration Example:
spring:
cloud:
stream:
bindings:
output:
destination: my-topic
binder: kafka
binders:
kafka:
type: kafka
environment:
spring:
kafka:
bootstrap-servers: localhost:9092
এখানে, spring.cloud.stream.bindings.output.destination
এর মাধ্যমে my-topic
নামক একটি Kafka টপিকের সাথে মেসেজ চ্যানেল যুক্ত করা হয়েছে।
Processor হলো Spring Cloud Stream এর একটি কাস্টম ইন্টারফেস যা মেসেজ প্রক্রিয়া করার জন্য ব্যবহৃত হয়। একটি Processor হলো একটি @StreamListener
এর মতো, কিন্তু এটি মেসেজকে একটি চ্যানেল থেকে নিয়ে অন্য একটি চ্যানেলে প্রেরণ করে। এতে ইনপুট এবং আউটপুট চ্যানেল থাকে, যা মেসেজ প্রসেসিং এবং রাউটিং করতে ব্যবহৃত হয়।
Spring Cloud Stream-এ Processor অ্যাপ্লিকেশন তৈরি করার মাধ্যমে আপনি সহজেই ইনপুট মেসেজ গ্রহণ করে সেগুলিকে আউটপুট চ্যানেলে প্রেরণ করতে পারেন।
উদাহরণ:
@EnableBinding(Processor.class) // Processor ইন্টারফেসটি ইনপুট এবং আউটপুট চ্যানেলকে ব্যাবহার করে
public class MessageProcessor {
@StreamListener(Processor.INPUT) // INPUT চ্যানেল থেকে মেসেজ গ্রহণ
@SendTo(Processor.OUTPUT) // OUTPUT চ্যানেলে মেসেজ পাঠানো
public String processMessage(String message) {
return "Processed: " + message; // মেসেজ প্রসেস করা
}
}
এখানে, Processor.INPUT
চ্যানেল থেকে মেসেজ গ্রহণ করা হচ্ছে এবং Processor.OUTPUT
চ্যানেল দিয়ে প্রসেসড মেসেজ আউটপুট হিসেবে প্রেরণ করা হচ্ছে।
Spring Cloud Stream এর সাথে Kafka ব্যবহার করার একটি উদাহরণ দেখানো হল।
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
এখানে spring-cloud-starter-stream-kafka
ডিপেনডেন্সি Kafka Binder ব্যবহার করার জন্য প্রয়োজনীয় কনফিগারেশন সরবরাহ করে।
spring:
cloud:
stream:
bindings:
input:
destination: input-topic
group: my-group
output:
destination: output-topic
binder:
kafka:
type: kafka
environment:
spring:
kafka:
bootstrap-servers: localhost:9092
এখানে, input-topic
এবং output-topic
দুটি Kafka টপিকের সাথে মেসেজ চ্যানেল যুক্ত করা হয়েছে। spring.kafka.bootstrap-servers
দিয়ে Kafka সার্ভারের অবস্থান নির্ধারণ করা হয়েছে।
@EnableBinding(Processor.class)
public class KafkaProcessor {
@StreamListener(Processor.INPUT) // ইনপুট চ্যানেল থেকে মেসেজ গ্রহণ
@SendTo(Processor.OUTPUT) // আউটপুট চ্যানেলে মেসেজ প্রেরণ
public String process(String message) {
return "Processed Message: " + message;
}
}
এখানে, ইনপুট চ্যানেল থেকে মেসেজ গ্রহণ করা হচ্ছে এবং আউটপুট চ্যানেলে প্রক্রিয়াকৃত মেসেজ পাঠানো হচ্ছে।
Spring Cloud Stream এর মাধ্যমে মেসেজ-চালিত অ্যাপ্লিকেশন তৈরি করা যায়, যেখানে Message Channel, Binder, এবং Processor হলো মূল উপাদান।
Spring Cloud Stream এবং Kafka বা RabbitMQ এর মতো মেসেজ ব্রোকার ব্যবহার করে ডিস্ট্রিবিউটেড সিস্টেমে মেসেজিং এবং স্ট্রিম প্রসেসিং সহজে করা যায়, যা মাইক্রোসার্ভিস আর্কিটেকচারে কার্যকরী সমাধান প্রদান করে।
Spring Cloud Stream হল একটি মেসেজিং ও ইভেন্ট-ড্রিভেন মাইক্রোসার্ভিস আর্কিটেকচারের জন্য ব্যবহৃত ফ্রেমওয়ার্ক। এটি Apache Kafka, RabbitMQ, এবং অন্যান্য মেসেজিং সিস্টেমের সঙ্গে ইন্টিগ্রেটেড মেসেজিং সাপোর্ট প্রদান করে। Spring Cloud Stream ইভেন্ট-ড্রিভেন আর্কিটেকচারে producer এবং consumer মডেল ব্যবহার করে, যেখানে producer মেসেজ পাঠায় এবং consumer সেই মেসেজ গ্রহণ করে।
এখানে Spring Cloud Stream ব্যবহার করার জন্য একটি উদাহরণ দেওয়া হচ্ছে, যেখানে producer এবং consumer মাইক্রোসার্ভিস তৈরি করা হবে এবং একটি মেসেজ কিউ (যেমন RabbitMQ বা Kafka) এর মাধ্যমে মেসেজ প্রেরণ ও গ্রহণ করা হবে।
আমরা দুটি অ্যাপ্লিকেশন তৈরি করবো:
এখানে, আমরা RabbitMQ ব্যবহার করব মেসেজ ব্রোকার হিসেবে।
Producer সার্ভিসে Spring Cloud Stream এবং RabbitMQ কনফিগারেশন করতে হবে।
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
Consumer সার্ভিসের জন্যও একইভাবে Spring Cloud Stream এবং RabbitMQ কনফিগারেশন করতে হবে।
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Producer সার্ভিস একটি মেসেজ পাঠাবে RabbitMQ এ।
application.yml
:spring:
cloud:
stream:
bindings:
output:
destination: messages
content-type: application/json
rabbit:
binder:
nodes: localhost
Producer Service এ একটি মেসেজ তৈরি এবং পাঠানোর জন্য একটি service class তৈরি করা হবে।
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MessageProducerController {
private final StreamBridge streamBridge;
public MessageProducerController(StreamBridge streamBridge) {
this.streamBridge = streamBridge;
}
@GetMapping("/sendMessage")
public String sendMessage() {
String message = "Hello from Producer Service!";
streamBridge.send("output", message);
return "Message Sent: " + message;
}
}
StreamBridge
ব্যবহার করা হচ্ছে যেটি Spring Cloud Stream এর মাধ্যমে মেসেজ প্রেরণ করে।output
হল মেসেজ সেন্টারে (RabbitMQ Queue) কনফিগার করা ডেস্টিনেশন।Consumer সার্ভিস মেসেজ গ্রহণ করবে এবং প্রসেস করবে।
application.yml
:spring:
cloud:
stream:
bindings:
input:
destination: messages
content-type: application/json
rabbit:
binder:
nodes: localhost
Consumer Service এ একটি listener তৈরি করা হবে, যা RabbitMQ থেকে মেসেজ গ্রহণ করবে এবং তা প্রসেস করবে।
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Service;
@EnableBinding(Sink.class)
@Service
public class MessageConsumerService implements MessageHandler {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("Received message: " + message.getPayload());
}
}
Sink
হল একটি Spring Cloud Stream @EnableBinding
ইনটারফেস যা ইনপুট চ্যানেল থেকে মেসেজ গ্রহণ করে।handleMessage()
মেথডটি RabbitMQ থেকে আসা মেসেজ গ্রহণ এবং প্রসেস করে।এখানে, আমরা RabbitMQ ব্যবহার করছি মেসেজ ব্রোকার হিসেবে। নিশ্চিত করুন যে RabbitMQ আপনার মেশিনে চলমান আছে বা আপনি RabbitMQ Docker কনটেইনার ব্যবহার করছেন।
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
এই কমান্ডটি RabbitMQ সার্ভার এবং ম্যানেজমেন্ট কনসোল চালু করবে। আপনি http://localhost:15672 এ RabbitMQ কনসোল অ্যাক্সেস করতে পারবেন।
প্রথমে Consumer Service চালু করুন:
mvn spring-boot:run -Dspring-boot.run.profiles=consumer
তারপর Producer Service চালু করুন:
mvn spring-boot:run -Dspring-boot.run.profiles=producer
Producer Service থেকে /sendMessage
পাথ রিকোয়েস্ট পাঠিয়ে মেসেজ পাঠান:
GET http://localhost:8081/sendMessage
মেসেজটি RabbitMQ কিউতে পৌঁছানোর পর Consumer Service এটি গ্রহণ করবে এবং কনসোল এ প্রদর্শিত হবে:
Received message: Hello from Producer Service!
এভাবে, Spring Cloud Stream ব্যবহার করে আপনি RabbitMQ এর মাধ্যমে মেসেজ প্রেরণ এবং গ্রহণ করতে পারেন। এই উদাহরণে, Producer মাইক্রোসার্ভিস মেসেজ পাঠাচ্ছে এবং Consumer মাইক্রোসার্ভিস সেটি গ্রহণ করছে। আপনি এটি Kafka বা অন্য মেসেজ ব্রোকারের সাথে ইন্টিগ্রেট করেও ব্যবহার করতে পারেন। Spring Cloud Stream সহজে ইভেন্ট-ড্রিভেন মাইক্রোসার্ভিস আর্কিটেকচার তৈরি করতে সহায়ক।
Read more