Apache Flink এ Database এবং External System এর সাথে সংযোগ স্থাপন করা যায় বিভিন্ন বিল্ট-ইন কনেক্টর এবং API এর মাধ্যমে। Flink ডেটা স্ট্রিম এবং ব্যাচ প্রসেসিংয়ের জন্য রিলেশনাল ডাটাবেস, NoSQL ডাটাবেস, মেসেজিং সার্ভিস, এবং ফাইল সিস্টেমের সাথে ইন্টিগ্রেট করতে সক্ষম। Flink এই সংযোগ স্থাপনের জন্য বিভিন্ন ধরনের কনেক্টর ও টুলস প্রদান করে, যা সহজ এবং দক্ষ ডেটা প্রসেসিংকে সমর্থন করে।
Flink এর JDBC Connector ব্যবহার করে রিলেশনাল ডাটাবেস যেমন MySQL, PostgreSQL, Oracle, এবং অন্যান্য ডাটাবেসের সাথে সংযোগ স্থাপন করা যায়। এটি SQL এর মাধ্যমে ডেটা পড়া ও লেখা উভয়ই সমর্থন করে।
DataStream<Tuple2<Integer, String>> sourceStream = env.fromElements(
Tuple2.of(1, "Alice"),
Tuple2.of(2, "Bob")
);
JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost:3306/mydb")
.setUsername("user")
.setPassword("password")
.setQuery("INSERT INTO users (id, name) VALUES (?, ?)")
.build();
sourceStream.addSink(sink);
Flink বিভিন্ন NoSQL ডাটাবেসের জন্য কনেক্টর সরবরাহ করে, যেমন Apache Cassandra, MongoDB ইত্যাদি। NoSQL ডাটাবেস সাধারণত বড় আকারের ডিস্ট্রিবিউটেড ডেটা ম্যানেজমেন্ট এবং রিয়েল-টাইম ডেটা অ্যাপ্লিকেশনগুলির জন্য ব্যবহৃত হয়।
CassandraSink.addSink(dataStream)
.setQuery("INSERT INTO keyspace.table (id, value) values (?, ?);")
.setClusterBuilder(() -> Cluster.builder().addContactPoint("localhost"))
.build();
Properties properties = new Properties();
properties.setProperty("mongo.uri", "mongodb://localhost:27017");
properties.setProperty("database", "mydb");
properties.setProperty("collection", "mycollection");
FlinkMongoSink<String> mongoSink = new FlinkMongoSink<>(properties);
dataStream.addSink(mongoSink);
Message Queue System এর মাধ্যমে Flink রিয়েল-টাইম ডেটা স্ট্রিম প্রসেসিং করতে পারে। Flink এর বিল্ট-ইন কনেক্টর রয়েছে Apache Kafka, RabbitMQ, এবং Amazon Kinesis এর জন্য, যা ডেটা স্ট্রিমিং অ্যাপ্লিকেশনগুলির জন্য গুরুত্বপূর্ণ।
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "flink-consumer");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
"input-topic",
new SimpleStringSchema(),
properties
);
DataStream<String> kafkaStream = env.addSource(kafkaConsumer);
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5672)
.setUserName("guest")
.setPassword("guest")
.build();
RMQSource<String> rabbitMQSource = new RMQSource<>(
connectionConfig,
"queue_name",
true,
new SimpleStringSchema()
);
DataStream<String> rabbitMQStream = env.addSource(rabbitMQSource);
Flink বিভিন্ন ফাইল সিস্টেম যেমন HDFS, S3, GCS ইত্যাদির সাথে ইন্টিগ্রেট করতে পারে। File System Connector ব্যবহার করে Flink ফাইল থেকে ডেটা পড়তে এবং সেখানে ডেটা লিখতে পারে।
StreamingFileSink<String> hdfsSink = StreamingFileSink
.forRowFormat(new Path("hdfs:///output"), new SimpleStringEncoder<String>("UTF-8"))
.build();
dataStream.addSink(hdfsSink);
StreamingFileSink<String> s3Sink = StreamingFileSink
.forRowFormat(new Path("s3://bucket-name/output"), new SimpleStringEncoder<String>("UTF-8"))
.build();
dataStream.addSink(s3Sink);