Apache Kafka Connect একটি শক্তিশালী ফ্রেমওয়ার্ক যা কাফকাকে বিভিন্ন ডেটা উৎস এবং গন্তব্যের সাথে সংযুক্ত করতে সাহায্য করে। তবে, কখনো কখনো আপনাকে এমন একটি কাস্টম কনেক্টর তৈরি করতে হতে পারে যা আপনার নির্দিষ্ট প্রয়োজন মেটাতে পারে। কাস্টম কনেক্টর তৈরি করার মাধ্যমে আপনি কাফকা কানেক্টের ফিচারগুলো কাস্টমাইজ করে আপনার প্রয়োজনীয় ডেটা ফ্লো এবং প্রসেসিং সেটআপ করতে পারবেন।
এই গাইডে, আমরা কীভাবে কাস্টম কাফকা কনেক্টর তৈরি করতে হয় এবং এর মূল কম্পোনেন্টগুলো কী, তা নিয়ে বিস্তারিত আলোচনা করবো।
Kafka Connect কাস্টম কনেক্টর কী?
Kafka Connect হল একটি ফ্রেমওয়ার্ক যা কাফকা ক্লাস্টারের সাথে বাইরের সিস্টেম (যেমন ডেটাবেস, ফাইল সিস্টেম, এবং অন্যান্য স্টোরেজ সিস্টেম) এর মধ্যে ডেটা স্ট্রীমিং এবং ট্রান্সফারের জন্য ব্যবহৃত হয়। কাফকা কনেক্টর দুটি ধরণের হতে পারে:
- Source Connector: এটি বাইরের সিস্টেম থেকে কাফকায় ডেটা আনে।
- Sink Connector: এটি কাফকায় সঞ্চিত ডেটা বাইরের সিস্টেমে প্রেরণ করে।
কাস্টম কনেক্টর তৈরি করতে হলে, আপনাকে এই দুটি টিপিক্যাল কনফিগারেশন এবং ইন্টারফেসের মাধ্যমে কাজ করতে হবে।
Custom Connector তৈরি করার জন্য প্রয়োজনীয় পদক্ষেপ
১. প্রয়োজনীয় লাইব্রেরি ইমপোর্ট করা
প্রথমে, আপনি Kafka Connect API ব্যবহার করতে হবে, যা Apache Kafka Connect API লাইব্রেরির অন্তর্গত। এই লাইব্রেরিগুলির মধ্যে রয়েছে kafka-connect-api এবং kafka-connect-runtime।
যদি আপনি মাভেন (Maven) ব্যবহার করেন, আপনার pom.xml ফাইলে নিচের ডিপেনডেন্সি যোগ করুন:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-connect-api</artifactId>
<version>3.0.0</version> <!-- কাফকার উপযুক্ত ভার্সন ব্যবহার করুন -->
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-connect-runtime</artifactId>
<version>3.0.0</version>
</dependency>
২. Source Connector তৈরি করা
একটি কাস্টম source connector তৈরি করতে আপনাকে SourceConnector এবং SourceTask ক্লাসগুলি ইমপ্লিমেন্ট করতে হবে।
- SourceConnector ক্লাসটি মূল কনফিগারেশন প্রক্রিয়া পরিচালনা করে এবং কনেক্টরকে ইনিশিয়ালাইজ করে।
- SourceTask ক্লাসটি সেই কাজটি করে যা ডেটা গ্রহণ করার জন্য প্রয়োজন, যেমন ডেটা ফেচ করা এবং তা কাফকায় পাঠানো।
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.common.config.ConfigDef;
import java.util.List;
import java.util.Map;
public class MySourceConnector extends SourceConnector {
@Override
public void start(Map<String, String> props) {
// কনফিগারেশন শুরু করা
}
@Override
public Class<? extends Task> taskClass() {
return MySourceTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
// কনফিগারেশন টাস্কগুলির জন্য সৃষ্ট করা
return null;
}
@Override
public void stop() {
// কনেক্টর বন্ধ করা
}
@Override
public ConfigDef config() {
// কনফিগারেশন ডেফিনিশন
return new ConfigDef();
}
}
এখানে taskClass ফাংশনটি আপনাকে যেকোনো একটি টাস্ক ক্লাসের রিটার্ন করবে, যা মূলত ডেটা প্রক্রিয়া করার কাজ করে।
৩. Source Task তৈরি করা
SourceTask ক্লাসটি সেই স্থানে কাজ করবে যেখানে বাস্তবিক ডেটা কাফকায় প্রেরণ করা হয়। এর মধ্যে poll মেথডটি ডেটা নিয়ে আসার কাজ করে।
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import java.util.List;
import java.util.ArrayList;
public class MySourceTask extends SourceTask {
@Override
public void start(Map<String, String> props) {
// এখানে কাস্টম কনফিগারেশন বা ইনিশিয়ালাইজেশন করতে পারেন
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> records = new ArrayList<>();
// ডেটা ফেচ করে কাফকায় পাঠানো
records.add(new SourceRecord(null, null, "my_topic", null, Schema.STRING_SCHEMA, "sample data"));
return records;
}
@Override
public void stop() {
// কনসিউমার বা অন্যান্য রিসোর্স বন্ধ করা
}
@Override
public String version() {
return "1.0";
}
}
এই কোডে, poll মেথডটি একটি ডেটা রেকর্ড তৈরি করে এবং তা কাফকা টপিকের মধ্যে পাঠায়।
৪. Sink Connector তৈরি করা
একটি কাস্টম sink connector তৈরি করতে হলে, আপনাকে SinkConnector এবং SinkTask ক্লাসগুলি ইমপ্লিমেন্ট করতে হবে।
- SinkConnector: এটি মূলত কনফিগারেশন এবং টাস্ক বিল্ডিংয়ের জন্য ব্যবহার করা হয়।
- SinkTask: এটি ডেটা প্রক্রিয়া করে এবং কাফকা থেকে বাহিরে কোনো সিস্টেমে লেখে।
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
import java.util.List;
public class MySinkConnector extends SinkConnector {
@Override
public void start(Map<String, String> props) {
// কনফিগারেশন শুরু করা
}
@Override
public Class<? extends SinkTask> taskClass() {
return MySinkTask.class;
}
@Override
public void stop() {
// কনেক্টর বন্ধ করা
}
@Override
public ConfigDef config() {
return new ConfigDef();
}
}
এখানে taskClass মেথডটি SinkTask ক্লাসের রিটার্ন দেবে, যা ডেটা কাফকা থেকে গ্রহন করবে এবং টার্গেট ডেটাবেসে পাঠাবে।
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.sink.SinkRecord;
public class MySinkTask extends SinkTask {
@Override
public void start(Map<String, String> props) {
// সিঙ্ক কনফিগারেশন শুরু
}
@Override
public void put(List<SinkRecord> records) {
for (SinkRecord record : records) {
// কাফকা থেকে ডেটা গ্রহন করে বাহিরে পাঠানো
System.out.println("Writing record to external system: " + record.value());
}
}
@Override
public void stop() {
// সিঙ্ক রিসোর্স বন্ধ করা
}
@Override
public String version() {
return "1.0";
}
}
৫. কনফিগারেশন ফাইল তৈরি করা
কাস্টম কনেক্টর তৈরি করার পর, একটি কনফিগারেশন ফাইল তৈরি করতে হবে যা কানেক্টরটি কনফিগার করবে। এই ফাইলটি সাধারণত .properties ফরম্যাটে হয়।
name=my-source-connector
connector.class=com.example.MySourceConnector
tasks.max=1
সারাংশ
Kafka Connect কাস্টম কনেক্টর তৈরি করার মাধ্যমে আপনি কাফকা সিস্টেমের সাথে বাইরের ডেটা উৎস এবং গন্তব্যকে সংযুক্ত করতে পারবেন। কাস্টম source এবং sink কনেক্টর তৈরির জন্য, আপনাকে SourceConnector, SourceTask, SinkConnector, এবং SinkTask ক্লাসগুলি ইমপ্লিমেন্ট করতে হবে। এর মাধ্যমে ডেটা ইনজেস্ট এবং আউটপুট প্রক্রিয়া কাস্টমাইজ করা যায়, যা আপনার কাফকা স্ট্রিমিং সিস্টেমের কার্যকারিতা বৃদ্ধি করে।
Read more