Apache Flink এর Connector এবং Integration সমৃদ্ধ লাইব্রেরি স্ট্রিম প্রসেসিং এবং ব্যাচ প্রসেসিংয়ের জন্য বিভিন্ন ডেটা সোর্স এবং সিঙ্কের সাথে সংযোগ স্থাপন এবং ডেটা আদান-প্রদান সহজ করে তোলে। Flink বিভিন্ন ডেটা স্টোরেজ সিস্টেম, মেসেজিং সিস্টেম, ফাইল সিস্টেম, এবং ডেটাবেসের সাথে ইন্টিগ্রেট করতে পারে, যা একে শক্তিশালী এবং স্কেলেবল ডেটা প্রসেসিং প্ল্যাটফর্ম করে তোলে।
Flink অনেকগুলো বিল্ট-ইন কনেক্টর সরবরাহ করে, যার মাধ্যমে বিভিন্ন ডেটা সোর্স থেকে ডেটা পড়া এবং বিভিন্ন ডেস্টিনেশনে ডেটা লেখা যায়। কিছু জনপ্রিয় কনেক্টর এবং তাদের কাজ নিচে উল্লেখ করা হলো:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
"input-topic",
new SimpleStringSchema(),
properties
);
DataStream<String> stream = env.addSource(kafkaConsumer);
CassandraSink.addSink(dataStream)
.setQuery("INSERT INTO keyspace.table (id, value) values (?, ?);")
.setClusterBuilder(() -> Cluster.builder().addContactPoint("localhost"))
.build();
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
(element, ctx, indexer) -> {
Map<String, String> json = new HashMap<>();
json.put("data", element);
IndexRequest request = Requests.indexRequest()
.index("my-index")
.source(json);
indexer.add(request);
}
);
dataStream.addSink(esSinkBuilder.build());
JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost:3306/mydb")
.setUsername("user")
.setPassword("password")
.setQuery("INSERT INTO mytable (id, name) VALUES (?, ?)")
.build();
StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path("hdfs:///output"), new SimpleStringEncoder<String>("UTF-8"))
.build();
dataStream.addSink(sink);
Properties kinesisConsumerConfig = new Properties();
kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-west-2");
kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "your-access-key");
kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "your-secret-key");
FlinkKinesisConsumer<String> kinesisConsumer = new FlinkKinesisConsumer<>(
"input-stream",
new SimpleStringSchema(),
kinesisConsumerConfig
);
DataStream<String> kinesisStream = env.addSource(kinesisConsumer);
DataStream<String> text = env.readTextFile("file:///path/to/input");
StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path("file:///path/to/output"), new SimpleStringEncoder<String>("UTF-8"))
.build();
text.addSink(sink);
PulsarSourceBuilder<String> pulsarSource = PulsarSource
.builder(new SimpleStringSchema())
.serviceUrl("pulsar://localhost:6650")
.subscriptionName("test-subscription")
.topic("my-topic");
Apache Flink বিভিন্ন ডেটা স্ট্রিম সোর্স এবং সিঙ্কের সাথে ইন্টিগ্রেশন করার জন্য অনেকগুলো কনেক্টর (Connector) সমর্থন করে। Flink-এর কনেক্টরগুলো ডেটা ইনজেস্ট এবং আউটপুট করার জন্য ব্যবহৃত হয়, যা স্ট্রিম প্রসেসিং অ্যাপ্লিকেশনগুলোর জন্য খুবই গুরুত্বপূর্ণ। এখানে Flink-এর কিছু জনপ্রিয় কনেক্টর যেমন Kafka, RabbitMQ, এবং Filesystem নিয়ে বিস্তারিত আলোচনা করা হলো।
Apache Kafka হলো একটি জনপ্রিয় ডিসট্রিবিউটেড স্ট্রিমিং প্ল্যাটফর্ম যা লার্জ-স্কেল স্ট্রিম ডেটা প্রসেসিং এবং ইন্টিগ্রেশন সলিউশন হিসেবে ব্যবহৃত হয়। Flink Kafka কনেক্টরের মাধ্যমে Kafka-র টপিক থেকে ডেটা পড়তে এবং লিখতে পারে।
Flink Kafka কনেক্টর ব্যবহার করতে হলে Maven বা Gradle প্রজেক্টে flink-connector-kafka
dependency যোগ করতে হয়। নিচে একটি উদাহরণ দেয়া হলো:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.15.2</version> <!-- আপনার Flink সংস্করণ অনুসারে এটি পরিবর্তন করুন -->
</dependency>
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class FlinkKafkaExample {
public static void main(String[] args) throws Exception {
// Execution Environment তৈরি করা
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Kafka কনজিউমার কনফিগারেশন সেটআপ
Properties consumerProps = new Properties();
consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-group");
consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Kafka থেকে ডেটা পড়া
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
"input-topic",
new SimpleStringSchema(),
consumerProps
);
// Kafka প্রডিউসার কনফিগারেশন সেটআপ
Properties producerProps = new Properties();
producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(
"output-topic",
new SimpleStringSchema(),
producerProps
);
// ডেটা প্রসেসিং এবং আউটপুট
env.addSource(consumer)
.map(value -> "Processed: " + value)
.addSink(producer);
env.execute("Flink Kafka Example");
}
}
RabbitMQ একটি জনপ্রিয় মেসেজ ব্রোকার যা মেসেজ কিউ এবং পুব/সাব (Publish/Subscribe) মেসেজিং প্যাটার্ন সমর্থন করে। Flink RabbitMQ কনেক্টর ব্যবহার করে, RabbitMQ থেকে ডেটা ইনজেস্ট করা এবং ডেটা আউটপুট করা সম্ভব।
Flink RabbitMQ কনেক্টর ব্যবহার করতে হলে, Maven বা Gradle প্রজেক্টে flink-connector-rabbitmq
dependency যোগ করতে হবে।
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq</artifactId>
<version>1.15.2</version> <!-- আপনার Flink সংস্করণ অনুসারে এটি পরিবর্তন করুন -->
</dependency>
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
public class FlinkRabbitMQExample {
public static void main(String[] args) throws Exception {
// Execution Environment তৈরি করা
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// RabbitMQ কনফিগারেশন সেটআপ
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5672)
.setUserName("guest")
.setPassword("guest")
.setVirtualHost("/")
.build();
// RabbitMQ থেকে ডেটা পড়া
env.addSource(new RMQSource<>(
connectionConfig,
"queue_name",
true,
new SimpleStringSchema()
)).print();
env.execute("Flink RabbitMQ Example");
}
}
Flink-এর Filesystem Connector স্ট্যাটিক এবং ডায়নামিক ডেটাসেটের জন্য ফাইল সিস্টেম থেকে ডেটা ইনজেস্ট করা এবং আউটপুট করতে সাহায্য করে। এটি লোকাল ফাইল সিস্টেম, HDFS, S3 ইত্যাদি স্টোরেজ সমর্থন করে।
Flink Filesystem Connector-এ সাধারণত readTextFile()
এবং writeAsText()
মেথড ব্যবহার করে ফাইল পড়া এবং লেখা যায়।
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkFilesystemExample {
public static void main(String[] args) throws Exception {
// Execution Environment তৈরি করা
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Filesystem থেকে ডেটা পড়া
DataStream<String> fileStream = env.readTextFile("path/to/input.txt");
// ডেটা প্রসেসিং এবং ফাইল আউটপুট
fileStream
.map(value -> "Processed: " + value)
.writeAsText("path/to/output.txt");
env.execute("Flink Filesystem Example");
}
}
Apache Flink-এ Kafka, RabbitMQ, এবং Filesystem কনেক্টর ব্যবহার করে বিভিন্ন সোর্স এবং সিঙ্ক থেকে ডেটা ইনজেস্ট এবং আউটপুট করা যায়। এগুলো Flink-এর স্ট্রিম প্রসেসিং অ্যাপ্লিকেশনগুলোর জন্য অত্যন্ত গুরুত্বপূর্ণ এবং নির্ভরযোগ্য ডেটা ইন্টিগ্রেশন সমাধান প্রদান করে।
Apache Flink-এ Data Sources এবং Sinks হলো ডেটা প্রসেসিং পিপলাইনের দুটি মূল উপাদান যা ইনপুট ডেটা সংগ্রহ এবং আউটপুট ডেটা সংরক্ষণ করতে ব্যবহৃত হয়। Flink বিভিন্ন ধরনের ডেটা সোর্স এবং সিংক সাপোর্ট করে, যা বিভিন্ন ডেটা সিস্টেমের সাথে ইন্টিগ্রেশন সহজ করে তোলে।
Data Source হলো Flink অ্যাপ্লিকেশনের ইনপুট ডেটার উৎস। এটি বিভিন্ন ধরনের ডেটা সিস্টেম বা স্টোরেজ থেকে ডেটা সংগ্রহ করে এবং Flink স্ট্রিম প্রসেসিং ইঞ্জিনে প্রেরণ করে। Flink অনেকগুলি বিল্ট-ইন সোর্স সাপোর্ট করে যেমন:
উদাহরণ (File Source):
DataStream<String> textStream = env.readTextFile("path/to/textfile.txt");
Kafka Source উদাহরণ:
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
"topic_name",
new SimpleStringSchema(),
properties
);
DataStream<String> kafkaStream = env.addSource(kafkaConsumer);
বর্ণনা: এখানে FlinkKafkaConsumer
ব্যবহার করে Kafka থেকে একটি স্ট্রিম পড়া হচ্ছে, যা নির্দিষ্ট topic
থেকে ডেটা সংগ্রহ করছে।
Data Sink হলো Flink অ্যাপ্লিকেশনের আউটপুট যেখানে প্রক্রিয়াকৃত ডেটা সংরক্ষণ করা হয়। এটি ডেটাকে বিভিন্ন আউটপুট ডেস্টিনেশন যেমন ফাইল, ডাটাবেস, মেসেজিং সিস্টেমে পাঠানোর জন্য ব্যবহৃত হয়। Flink এর বেশ কিছু বিল্ট-ইন সিংক রয়েছে:
উদাহরণ (File Sink):
resultStream.writeAsText("path/to/outputfile.txt", FileSystem.WriteMode.OVERWRITE);
Kafka Sink উদাহরণ:
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
"output_topic",
new SimpleStringSchema(),
properties
);
resultStream.addSink(kafkaProducer);
বর্ণনা: এখানে, প্রক্রিয়াকৃত ডেটা Kafka-তে output_topic
নামে একটি টপিকে প্রেরণ করা হচ্ছে।
Flink-এর Data Sources এবং Sinks ইন্টিগ্রেশন করার সময় নিম্নোক্ত ধাপগুলি অনুসরণ করতে হয়:
Flink-এ, প্রয়োজন অনুযায়ী Custom Sources এবং Sinks তৈরি করা যায়। কাস্টম সোর্স বা সিংক তৈরি করার সময়, SourceFunction
বা SinkFunction
ইন্টারফেস ইমপ্লিমেন্ট করতে হয়।
Custom Source উদাহরণ:
public class CustomStringSource implements SourceFunction<String> {
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (true) {
ctx.collect("Custom Data");
Thread.sleep(1000);
}
}
@Override
public void cancel() {
// Cleanup code
}
}
Custom Sink উদাহরণ:
public class CustomPrintSink implements SinkFunction<String> {
@Override
public void invoke(String value, Context context) {
System.out.println("Output: " + value);
}
}
Apache Flink-এ Data Sources এবং Sinks ইন্টিগ্রেশন করা স্ট্রিম এবং ব্যাচ প্রসেসিং অ্যাপ্লিকেশনের একটি গুরুত্বপূর্ণ অংশ। Flink এর স্ট্যান্ডার্ড সোর্স এবং সিংক সাপোর্ট করে বিভিন্ন ডেটা স্টোরেজ এবং মেসেজিং সিস্টেমের সাথে সহজেই ইন্টিগ্রেট করা যায়। এছাড়াও, কাস্টম সোর্স এবং সিংক তৈরি করে ফ্লেক্সিবিলিটি আরও বাড়ানো যায়।
Apache Flink এ Database এবং External System এর সাথে সংযোগ স্থাপন করা যায় বিভিন্ন বিল্ট-ইন কনেক্টর এবং API এর মাধ্যমে। Flink ডেটা স্ট্রিম এবং ব্যাচ প্রসেসিংয়ের জন্য রিলেশনাল ডাটাবেস, NoSQL ডাটাবেস, মেসেজিং সার্ভিস, এবং ফাইল সিস্টেমের সাথে ইন্টিগ্রেট করতে সক্ষম। Flink এই সংযোগ স্থাপনের জন্য বিভিন্ন ধরনের কনেক্টর ও টুলস প্রদান করে, যা সহজ এবং দক্ষ ডেটা প্রসেসিংকে সমর্থন করে।
Flink এর JDBC Connector ব্যবহার করে রিলেশনাল ডাটাবেস যেমন MySQL, PostgreSQL, Oracle, এবং অন্যান্য ডাটাবেসের সাথে সংযোগ স্থাপন করা যায়। এটি SQL এর মাধ্যমে ডেটা পড়া ও লেখা উভয়ই সমর্থন করে।
DataStream<Tuple2<Integer, String>> sourceStream = env.fromElements(
Tuple2.of(1, "Alice"),
Tuple2.of(2, "Bob")
);
JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost:3306/mydb")
.setUsername("user")
.setPassword("password")
.setQuery("INSERT INTO users (id, name) VALUES (?, ?)")
.build();
sourceStream.addSink(sink);
Flink বিভিন্ন NoSQL ডাটাবেসের জন্য কনেক্টর সরবরাহ করে, যেমন Apache Cassandra, MongoDB ইত্যাদি। NoSQL ডাটাবেস সাধারণত বড় আকারের ডিস্ট্রিবিউটেড ডেটা ম্যানেজমেন্ট এবং রিয়েল-টাইম ডেটা অ্যাপ্লিকেশনগুলির জন্য ব্যবহৃত হয়।
CassandraSink.addSink(dataStream)
.setQuery("INSERT INTO keyspace.table (id, value) values (?, ?);")
.setClusterBuilder(() -> Cluster.builder().addContactPoint("localhost"))
.build();
Properties properties = new Properties();
properties.setProperty("mongo.uri", "mongodb://localhost:27017");
properties.setProperty("database", "mydb");
properties.setProperty("collection", "mycollection");
FlinkMongoSink<String> mongoSink = new FlinkMongoSink<>(properties);
dataStream.addSink(mongoSink);
Message Queue System এর মাধ্যমে Flink রিয়েল-টাইম ডেটা স্ট্রিম প্রসেসিং করতে পারে। Flink এর বিল্ট-ইন কনেক্টর রয়েছে Apache Kafka, RabbitMQ, এবং Amazon Kinesis এর জন্য, যা ডেটা স্ট্রিমিং অ্যাপ্লিকেশনগুলির জন্য গুরুত্বপূর্ণ।
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "flink-consumer");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
"input-topic",
new SimpleStringSchema(),
properties
);
DataStream<String> kafkaStream = env.addSource(kafkaConsumer);
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5672)
.setUserName("guest")
.setPassword("guest")
.build();
RMQSource<String> rabbitMQSource = new RMQSource<>(
connectionConfig,
"queue_name",
true,
new SimpleStringSchema()
);
DataStream<String> rabbitMQStream = env.addSource(rabbitMQSource);
Flink বিভিন্ন ফাইল সিস্টেম যেমন HDFS, S3, GCS ইত্যাদির সাথে ইন্টিগ্রেট করতে পারে। File System Connector ব্যবহার করে Flink ফাইল থেকে ডেটা পড়তে এবং সেখানে ডেটা লিখতে পারে।
StreamingFileSink<String> hdfsSink = StreamingFileSink
.forRowFormat(new Path("hdfs:///output"), new SimpleStringEncoder<String>("UTF-8"))
.build();
dataStream.addSink(hdfsSink);
StreamingFileSink<String> s3Sink = StreamingFileSink
.forRowFormat(new Path("s3://bucket-name/output"), new SimpleStringEncoder<String>("UTF-8"))
.build();
dataStream.addSink(s3Sink);
Apache Flink এবং Apache Kafka-এর ইন্টিগ্রেশন অত্যন্ত সাধারণ এবং শক্তিশালী একটি পদ্ধতি যা স্ট্রিম ডেটা প্রসেসিং-এর জন্য খুব কার্যকরী। Flink Kafka কনেক্টরের মাধ্যমে, আপনি Kafka টপিক থেকে ডেটা পড়তে এবং Kafka টপিকে ডেটা লিখতে পারেন। Flink এবং Kafka ইন্টিগ্রেশনের জন্য নিম্নলিখিত ধাপগুলো অনুসরণ করতে হয়:
আপনার Maven বা Gradle প্রজেক্টে flink-connector-kafka
dependency যোগ করতে হবে। এটি Flink এবং Kafka-এর মধ্যে ইন্টিগ্রেশন করতে সহায়ক হবে। নিচে Maven এর জন্য dependency দেয়া হলো:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.15.2</version> <!-- আপনার Flink সংস্করণ অনুসারে এটি পরিবর্তন করুন -->
</dependency>
Kafka এবং Flink-এর মধ্যে ইন্টিগ্রেশন করার জন্য কিছু কনফিগারেশন সেট করতে হবে, যেমন: Kafka brokers, টপিকের নাম, গ্রুপ আইডি ইত্যাদি। এই কনফিগারেশনগুলো Properties ক্লাস ব্যবহার করে সেট করা হয়।
Kafka থেকে ডেটা পড়তে Flink Kafka Consumer এবং ডেটা লিখতে Flink Kafka Producer ব্যবহার করা হয়।
নিচে একটি উদাহরণ দেয়া হলো যেখানে Flink একটি Kafka টপিক থেকে ডেটা পড়ছে, প্রক্রিয়াজাত করছে এবং অন্য একটি Kafka টপিকে আউটপুট দিচ্ছে।
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class FlinkKafkaIntegrationExample {
public static void main(String[] args) throws Exception {
// Execution Environment তৈরি করা
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Kafka Consumer-এর কনফিগারেশন সেট করা
Properties consumerProperties = new Properties();
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-kafka-group");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Flink Kafka Consumer তৈরি করা
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
"input-topic", // Kafka টপিকের নাম
new SimpleStringSchema(), // ডেটা স্কিমা
consumerProperties // কনফিগারেশন
);
// Kafka থেকে ডেটা পড়া
DataStream<String> stream = env.addSource(kafkaConsumer);
// ডেটা প্রসেস করা
DataStream<String> processedStream = stream
.map(value -> "Processed: " + value);
// Kafka Producer-এর কনফিগারেশন সেট করা
Properties producerProperties = new Properties();
producerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Flink Kafka Producer তৈরি করা
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
"output-topic", // আউটপুট Kafka টপিকের নাম
new SimpleStringSchema(), // ডেটা স্কিমা
producerProperties // কনফিগারেশন
);
// ডেটা Kafka টপিকে লিখে দেয়া
processedStream.addSink(kafkaProducer);
// Flink Job চালানো
env.execute("Flink Kafka Integration Example");
}
}
BOOTSTRAP_SERVERS_CONFIG
: Kafka broker-এর ঠিকানা।GROUP_ID_CONFIG
: কনজিউমার গ্রুপ আইডি।KEY_DESERIALIZER_CLASS_CONFIG
এবং VALUE_DESERIALIZER_CLASS_CONFIG
: ডেটা ডেসেরিয়ালাইজার ক্লাস, যা ডেটা পড়তে ব্যবহৃত হয়।map
অপারেশন ব্যবহার করা হয়েছে, যা প্রতিটি ইভেন্টে "Processed: "
যুক্ত করে।BOOTSTRAP_SERVERS_CONFIG
: Kafka broker-এর ঠিকানা।KEY_SERIALIZER_CLASS_CONFIG
এবং VALUE_SERIALIZER_CLASS_CONFIG
: ডেটা সিরিয়ালাইজার ক্লাস, যা ডেটা লিখতে ব্যবহৃত হয়।env.execute()
মেথডটি Flink-এর জব শুরু করে।Flink এবং Apache Kafka-এর ইন্টিগ্রেশন অত্যন্ত সহজ এবং কার্যকরী। এটি real-time এবং streaming data processing অ্যাপ্লিকেশনের জন্য একটি শক্তিশালী সমাধান, যেখানে ডেটা প্রসেসিং, aggregation, এবং complex event processing সহজে করা সম্ভব হয়।