Kafka এবং Schema Registry Integration

অ্যাপাচি কাফকা (Apache Kafka) - Big Data and Analytics

417

Schema Registry হল একটি গুরুত্বপূর্ণ উপাদান যা কাফকা ক্লাস্টারের সাথে একত্রে কাজ করে, বিশেষ করে যখন ডেটা সেরিয়ালাইজেশন এবং ডিসেরিয়ালাইজেশন করা হয়। এটি Apache Avro স্কিমা ব্যবহার করে কাফকা মেসেজের স্ট্রাকচার বজায় রাখতে সহায়তা করে। Schema Registry কাফকায় স্টোর করা ডেটার স্কিমার সেন্ট্রাল রেপোজিটরি হিসেবে কাজ করে, এবং এটি ডেটার সঠিক কাঠামো নিশ্চিত করে, যাতে বিভিন্ন সার্ভিস এবং অ্যাপ্লিকেশনগুলো সঠিকভাবে ডেটা প্রক্রিয়া করতে পারে।

এই গাইডে আমরা আলোচনা করব কিভাবে Kafka এবং Schema Registry একত্রে কাজ করে এবং কিভাবে সেটি কনফিগার ও ব্যবহার করা যায়।


১. Schema Registry কী?

Schema Registry হল একটি সার্ভিস যা মেসেজ প্রডিউসার এবং কনজিউমারদের মধ্যে ডেটা স্কিমা প্রদান করে। এটি ডেটার স্ট্রাকচার নির্ধারণ করে এবং নিশ্চিত করে যে সমস্ত প্রডিউসার এবং কনজিউমার একই স্কিমা অনুসরণ করছে। এর মূল সুবিধা হলো, এটি ডেটার ভ্যালিডেশন এবং ডেটা মডেলিং সহজ করে।

Schema Registry প্রধানত Apache Avro, JSON Schema, অথবা Protobuf স্কিমার মাধ্যমে কাজ করে, তবে Avro বেশিরভাগ কাফকা ব্যবহারকারীদের মধ্যে জনপ্রিয়।


২. Kafka এবং Schema Registry Integration

Kafka এবং Schema Registry একত্রে কাজ করার জন্য প্রথমে Schema Registry সার্ভিসটি ইনস্টল এবং কনফিগার করতে হবে। এর পর, Kafka প্রডিউসার এবং কনজিউমারগুলিকে Schema Registry এর সাথে ইন্টিগ্রেট করতে হবে।

২.১. Schema Registry ইন্সটলেশন

Apache Kafka এবং Schema Registry ইনস্টল করার জন্য প্রথমে Confluent Platform ডাউনলোড করতে হবে, কারণ এটি Schema Registry এবং অন্যান্য সম্পর্কিত টুল সরবরাহ করে।

  1. Confluent Platform ডাউনলোড করুন: আপনি Confluent Platform থেকে ডাউনলোড করতে পারেন। এটি আপনাকে Kafka, Schema Registry এবং অন্যান্য প্রয়োজনীয় টুল সরবরাহ করবে।
  2. Schema Registry ইনস্টল করুন: Confluent Platform এর মাধ্যমে Schema Registry ইনস্টল এবং কনফিগার করা সহজ। এখানে একটি সাধারণ ইনস্টলেশন প্রক্রিয়া দেওয়া হলো:
    • Schema Registry ডাউনলোড এবং ইনস্টল করুন:

      # Confluent Platform repository যুক্ত করুন (Linux এর জন্য)
      wget -qO - https://packages.confluent.io/deb/7.0/archive.key | sudo apt-key add -
      sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/7.0 stable main"
      sudo apt-get update
      sudo apt-get install confluent-schema-registry
      
  3. Schema Registry কনফিগারেশন: Schema Registry এর কনফিগারেশন ফাইল schema-registry.properties সাধারণত /etc/schema-registry/schema-registry.properties এ থাকে। এখানে আপনাকে Kafka ব্রোকারের ঠিকানা এবং অন্যান্য প্যারামিটার কনফিগার করতে হবে।

    উদাহরণ কনফিগারেশন:

    kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092
    kafkastore.topic=_schemas
    

    এই কনফিগারেশনে, kafkastore.bootstrap.servers হল Kafka ক্লাস্টারের ঠিকানা এবং kafkastore.topic হল স্কিমা সংরক্ষণের জন্য নির্ধারিত Kafka টপিক।


৩. Kafka Producer এবং Consumer এর সাথে Schema Registry ব্যবহার

Kafka প্রডিউসার এবং কনজিউমার যখন Schema Registry ব্যবহার করে, তখন তারা Avro বা অন্য স্কিমা টাইপ ব্যবহার করে মেসেজ পাঠায় এবং গ্রহণ করে। Avro সেরিয়ালাইজেশন প্রক্রিয়া ব্যবহৃত হয় যাতে মেসেজের স্কিমা নির্ধারিত থাকে এবং সঠিকভাবে ডেটা পাঠানো যায়।

৩.১. Kafka Producer কনফিগারেশন

Kafka প্রডিউসারকে Schema Registry এর সাথে সংযোগ করতে হলে, Avro সেরিয়ালাইজার এবং Schema Registry client কনফিগার করতে হবে। Kafka Avro Serializer ব্যবহার করা হয় যাতে ডেটা Avro ফরম্যাটে সেরিয়ালাইজ করা যায়।

প্রডিউসার কনফিগারেশন উদাহরণ (Java):

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;

import java.util.Properties;

public class AvroProducer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", KafkaAvroSerializer.class.getName());
        props.put("schema.registry.url", "http://localhost:8081");

        KafkaProducer<String, Object> producer = new KafkaProducer<>(props);

        // আপনার Avro record তৈরি করুন এবং প্রডিউস করুন
        // producer.send(new ProducerRecord<>(topic, key, avroRecord));
    }
}
  • schema.registry.url প্যারামিটারটি Schema Registry সার্ভারের URL নির্দেশ করে।
  • KafkaAvroSerializer ক্লাসটি ডেটাকে Avro ফরম্যাটে সেরিয়ালাইজ করতে ব্যবহার করা হয়।

৩.২. Kafka Consumer কনফিগারেশন

Kafka কনজিউমারের ক্ষেত্রে, AvroDeserializer ব্যবহার করে ডেটাকে ডিসেরিয়ালাইজ করা হয়। এই কনফিগারেশনও Schema Registry এর সাথে কাজ করতে হবে।

কনজিউমার কনফিগারেশন উদাহরণ (Java):

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;

import java.util.Properties;

public class AvroConsumer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", KafkaAvroDeserializer.class.getName());
        props.put("schema.registry.url", "http://localhost:8081");

        KafkaConsumer<String, Object> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(List.of("my_topic"));

        while (true) {
            ConsumerRecords<String, Object> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, Object> record : records) {
                // আপনার ডেটা প্রসেস করুন
                System.out.println(record.value());
            }
        }
    }
}
  • schema.registry.url এবং AvroDeserializer এর মাধ্যমে ডেটা ডিসেরিয়ালাইজ করা হয়।

৪. Schema Evolution এবং Compatibility

Schema Registry একাধিক স্কিমা সংস্করণের সাপোর্ট করে, যা schema evolution নামে পরিচিত। এতে বিভিন্ন ধরনের ডেটা সংস্করণ ব্যবস্থাপনা করা যায় এবং নিশ্চিত করা হয় যে নতুন স্কিমা পুরানো স্কিমার সাথে সঙ্গতিপূর্ণ।

  • Compatibility: Schema Registry এর মধ্যে BACKWARD, FORWARD, এবং FULL কনফিগারেশন ব্যবহার করে স্কিমার compatibility চেক করা যায়।
    • BACKWARD: পুরানো স্কিমার সাথে নতুন স্কিমা সঙ্গতিপূর্ণ।
    • FORWARD: নতুন স্কিমার সাথে পুরানো স্কিমা সঙ্গতিপূর্ণ।
    • FULL: উভয় স্কিমার মধ্যে সঙ্গতি নিশ্চিত।

স্কিমা কমপ্যাটিবিলিটি কনফিগারেশন:

# Compatibility চেক করা
curl -X GET http://localhost:8081/subjects/my-topic-value/versions/latest

এটি বর্তমান স্কিমার সংস্করণ এবং কমপ্যাটিবিলিটি সুনিশ্চিত করতে সাহায্য করে।


সারাংশ

Kafka এবং Schema Registry একত্রে কাজ করার মাধ্যমে কাফকা মেসেজগুলির স্কিমা নিশ্চিত করা যায় এবং ডেটার সঠিকতা বজায় থাকে। Schema Registry ডেটার সেরিয়ালাইজেশন এবং ডিসেরিয়ালাইজেশন প্রক্রিয়া পরিচালনা করে এবং মেসেজ প্রডিউসার ও কনজিউমারদের জন্য একটি সেন্ট্রাল স্কিমা রেপোজিটরি প্রদান করে। এটি কাফকা ক্লাস্টারে schema evolution এবং compatibility পরিচালনা করার ক্ষেত্রে খুবই কার্যকর।

Content added By

Schema Registry হল একটি কেন্দ্রীয় সার্ভিস যা মেসেজের স্কিমা সংরক্ষণ, পরিচালনা এবং ভার্সনিং-এর জন্য ব্যবহৃত হয়। এটি Apache Kafka এবং অন্যান্য স্ট্রীমিং প্রযুক্তির সঙ্গে একত্রে কাজ করে ডেটার সঠিক কাঠামো এবং ফরম্যাট নিশ্চিত করতে সাহায্য করে। Schema Registry প্রধানত Avro, JSON Schema, বা Protobuf ফরম্যাটে ডেটা স্কিমা সাপোর্ট করে এবং এটি মেসেজ প্রডিউসার এবং কনজিউমারদের মধ্যে ডেটা ফরম্যাটের সামঞ্জস্য বজায় রাখে।

Kafka-তে স্কিমা ব্যবহারের মূল উদ্দেশ্য হল, মেসেজ বা ডেটার কাঠামো প্রতিটি প্রডিউসার এবং কনজিউমার দ্বারা একইভাবে বোঝা এবং প্রক্রিয়া করা, যাতে ডেটার সংস্করণ ব্যবস্থাপনা (versioning) এবং ভাল সংজ্ঞায়িত কাঠামো নিশ্চিত করা যায়।


১. Schema Registry কী?

Schema Registry একটি সেন্ট্রাল সার্ভিস যা ডেটা স্কিমাগুলি সংরক্ষণ, পরিচালনা এবং ভাগাভাগি করতে সহায়ক। এটি প্রধানত Apache Avro ফরম্যাটে ডেটা সেরিয়ালাইজেশন এবং ডিসেরিয়ালাইজেশন জন্য ব্যবহৃত হয়। Kafka প্রডিউসার এবং কনজিউমার স্কিমা রেজিস্ট্রির মাধ্যমে স্কিমার প্রতি নির্দিষ্ট URL এর মাধ্যমে স্কিমা অঙ্গীকার করতে পারে এবং ডেটার সঠিক কাঠামো বজায় রাখতে পারে।

Kafka এবং Schema Registry একসাথে কাজ করলে, স্কিমাগুলির ইভোলিউশন (evolution) বা পরিবর্তন করতে হয় না, বরং পুরানো এবং নতুন স্কিমার মধ্যে কমপ্যাটিবিলিটি (compatibility) রক্ষা করা হয়।


২. Schema Registry এর ভূমিকা

Schema Registry এর কয়েকটি গুরুত্বপূর্ণ ভূমিকা রয়েছে, যা Kafka ক্লাস্টারের কার্যকারিতা এবং ডেটার আস্থেয়তা নিশ্চিত করে।

২.১. ডেটার কাঠামো নিশ্চিত করা

Schema Registry প্রডিউসার এবং কনজিউমারদের মধ্যে ডেটা কাঠামো নির্ধারণ করে। এটি নিশ্চিত করে যে পাঠানো এবং গ্রহণ করা মেসেজগুলির কাঠামো সঠিক এবং অভ্যন্তরীণভাবে মিল রয়েছে। Schema Registry মেসেজ পাঠানোর সময় স্কিমা চেক করে এবং নিশ্চিত করে যে প্রডিউসার এবং কনজিউমার উভয়ই একই স্কিমা অনুসরণ করছে।

২.২. স্কিমা ভার্সনিং এবং ইভোলিউশন

Schema Registry বিভিন্ন স্কিমা ভার্সন পরিচালনা করতে সহায়তা করে। ডেটার কাঠামোতে কোনো পরিবর্তন আসলে তা Schema Registry-তে সংরক্ষিত থাকে এবং পরিবর্তনগুলো কনজিউমারদের কাছে উপযুক্তভাবে পৌঁছানো হয়। এটি Backward compatibility, Forward compatibility, এবং Full compatibility নিশ্চিত করে, যার মাধ্যমে পুরানো স্কিমা এবং নতুন স্কিমার মধ্যে সঠিক ইন্টারঅ্যাকশন ঘটে।

  • Backward Compatibility: নতুন স্কিমা পুরোনো স্কিমার সাথে সঙ্গতিপূর্ণ।
  • Forward Compatibility: পুরোনো স্কিমা নতুন স্কিমার সাথে সঙ্গতিপূর্ণ।
  • Full Compatibility: উভয় স্কিমা একে অপরের সাথে সঙ্গতিপূর্ণ।

২.৩. স্কিমা স্টোরেজ এবং রেজিস্ট্রেশন

Schema Registry একটি কেন্দ্রীয় রেপোজিটরি হিসেবে কাজ করে, যেখানে সমস্ত স্কিমা সংরক্ষিত হয়। এটি স্কিমা রেজিস্ট্রেশন করতে এবং ম্যানেজ করতে সাহায্য করে, যাতে প্রডিউসার এবং কনজিউমাররা একে অপরের স্কিমা ব্যবহার করতে পারে। Schema Registry সংরক্ষিত স্কিমার মধ্যে ভার্সন আপডেট বা পরিবর্তনও করতে পারে।

২.৪. ডেটা সেরিয়ালাইজেশন এবং ডিসেরিয়ালাইজেশন

Schema Registry-এর মাধ্যমে Avro, JSON Schema, এবং Protobuf সেরিয়ালাইজেশন এবং ডিসেরিয়ালাইজেশন করা হয়। এটি নিশ্চিত করে যে ডেটা পাঠানো এবং গ্রহণ করা সঠিক ফরম্যাটে হচ্ছে এবং ডেটার অখণ্ডতা বজায় থাকে।

২.৫. মেসেজ ভ্যালিডেশন

Schema Registry প্রডিউসার এবং কনজিউমারদের মেসেজের কাঠামো ভ্যালিডেট করতে সহায়ক। এটি নিশ্চিত করে যে মেসেজ প্রেরণের সময় স্কিমা সঠিকভাবে ব্যবহার হচ্ছে এবং কোনো ডেটা অসামঞ্জস্যপূর্ণ নয়। স্কিমা ভ্যালিডেশন প্রক্রিয়ার মাধ্যমে ডেটার সামঞ্জস্য এবং সঠিকতা যাচাই করা যায়।


৩. Schema Registry এর ফিচারসমূহ

Schema Registry এর কিছু গুরুত্বপূর্ণ ফিচার:

৩.১. স্কিমার জন্য REST API

Schema Registry একটি REST API প্রদান করে যার মাধ্যমে স্কিমা রেজিস্টার, আপডেট, বা ভ্যালিডেট করা যেতে পারে। এটি স্কিমার সেন্ট্রালাইজড ম্যানেজমেন্ট সক্ষম করে এবং Kafka প্রডিউসার ও কনজিউমারকে সহজভাবে স্কিমার সঙ্গে কাজ করতে সহায়তা করে।

৩.২. স্কিমা রিভিশন ম্যানেজমেন্ট

Schema Registry স্কিমার রিভিশন ট্র্যাক করে এবং নতুন রিভিশন যোগ করার জন্য স্কিমা পরিচালনার সুবিধা প্রদান করে। এটি রিভিশন ইতিহাস দেখার এবং প্রয়োজনীয় স্কিমা সংস্করণ অনুসন্ধানের জন্য সহায়ক।

৩.৩. কমপ্যাটিবিলিটি পলিসি

Schema Registry স্কিমার মধ্যে কমপ্যাটিবিলিটি নিশ্চিত করার জন্য বিভিন্ন পলিসি সাপোর্ট করে, যেমন BACKWARD, FORWARD, এবং FULL কমপ্যাটিবিলিটি। এই পলিসিগুলোর মাধ্যমে ডেটা রিভিশন এবং পরিবর্তন সঠিকভাবে ম্যানেজ করা যায়।


৪. Kafka এবং Schema Registry এর মধ্যে ইন্টিগ্রেশন

Kafka এবং Schema Registry একত্রে কাজ করার জন্য Schema Registry এর URL এবং সঠিক সেরিয়ালাইজেশন কনফিগারেশন করতে হয়। সাধারণত Kafka প্রডিউসার এবং কনজিউমারদের মধ্যে AvroSerializer এবং AvroDeserializer ব্যবহার করা হয়, যেগুলি Schema Registry এর সাথে ইন্টিগ্রেটেড থাকে।

৪.১. Kafka Producer এবং Schema Registry Integration

Kafka প্রডিউসারকে Schema Registry এর সাথে সংযোগ করতে হলে, Schema Registry URL এবং Avro সেরিয়ালাইজার কনফিগার করতে হবে। এই কনফিগারেশন Kafka প্রডিউসারকে Avro ফরম্যাটে ডেটা পাঠানোর অনুমতি দেয়।

প্রডিউসার কনফিগারেশন উদাহরণ (Java):

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", KafkaAvroSerializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081");

KafkaProducer<String, Object> producer = new KafkaProducer<>(props);

৪.২. Kafka Consumer এবং Schema Registry Integration

Kafka কনজিউমারের ক্ষেত্রে, AvroDeserializer ব্যবহার করে ডেটা ডিসেরিয়ালাইজ করা হয়। Schema Registry এর মাধ্যমে ডেটা ডিসেরিয়ালাইজেশন প্রক্রিয়া সঠিকভাবে সম্পন্ন হয়।

কনজিউমার কনফিগারেশন উদাহরণ (Java):

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", KafkaAvroDeserializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081");

KafkaConsumer<String, Object> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("my_topic"));

সারাংশ

Schema Registry Apache Kafka-এর একটি গুরুত্বপূর্ণ উপাদান যা মেসেজ প্রডিউসার এবং কনজিউমারদের মধ্যে ডেটা স্কিমার সঠিকতা এবং সামঞ্জস্য বজায় রাখে। এটি ডেটার সেরিয়ালাইজেশন এবং ডিসেরিয়ালাইজেশন প্রক্রিয়া সহজ করে এবং স্কিমার ইভোলিউশন ম্যানেজ করতে সহায়ক। Schema Registry-র মাধ্যমে কাফকা ক্লাস্টারে ডেটার সঠিক কাঠামো এবং সংস্করণ বজায় রাখা সম্ভব হয়, যা মেসেজ ভ্যালিডেশন এবং ডেটা সঠিকতার জন্য অত্যন্ত গুরুত্বপূর্ণ।

Content added By

Avro হল একটি ডেটা সেরিয়ালাইজেশন ফ্রেমওয়ার্ক যা বিশেষভাবে ডিস্ট্রিবিউটেড সিস্টেমে ডেটা প্রেরণ করার জন্য ডিজাইন করা হয়েছে। এটি একটি বাইনারি ফর্ম্যাট যা ডেটা কম্প্যাক্টভাবে সঞ্চয় এবং ট্রান্সফার করতে সহায়তা করে, এবং এটি কাফকা ক্লাস্টারের মধ্যে ডেটা সঞ্চালন করার জন্য একটি জনপ্রিয় সেরিয়ালাইজেশন পদ্ধতি। Avro এর সবচেয়ে বড় সুবিধা হল schema-based সেরিয়ালাইজেশন, যেখানে ডেটার গঠন (schema) নির্ধারণ করে, এবং schema ইন্টিগ্রেশনটি অত্যন্ত সুবিধাজনক এবং নমনীয় হয়।


Avro Schema কী?

Avro Schema হল একটি JSON ফর্ম্যাটে সংজ্ঞায়িত ডেটার গঠন (structure)। এটি একটি ডেটা সেরিয়ালাইজেশন ফরম্যাটের জন্য একটি মেটাডেটা হিসেবে কাজ করে, যা ডেটার প্রতিটি ফিল্ডের নাম, টাইপ, এবং অন্যান্য প্রপার্টি সম্পর্কে তথ্য প্রদান করে। Avro একটি schema registry এর সাহায্যে schema গুলোকে সেন্ট্রালাইজডভাবে পরিচালনা করতে পারে, যা সহজে schema আপডেট এবং ভার্সনিং পরিচালনা করতে সক্ষম করে।

Avro ফাইলের মূল বৈশিষ্ট্য হল:

  • Compact: এটি কম্প্যাক্ট বাইনারি ফরম্যাটে ডেটা সঞ্চয় করে।
  • Schema-Driven: ডেটার গঠন schema দিয়ে পরিচালিত হয়।
  • Self-describing: প্রতিটি Avro ডেটা ব্লকের সাথে তার schema অন্তর্ভুক্ত থাকে, যা ডেটার ব্যাখ্যা প্রদান করে।

Avro Data Serialization in Kafka

Kafka-তে Avro সেরিয়ালাইজেশন ব্যবহৃত হয় যাতে ডেটা সঞ্চালন দ্রুত এবং কম্প্যাক্ট হয়, এবং কাফকার প্রযোজনা (producer) ও গ্রহণ (consumer) উভয় প্রান্তে schema সার্বভৌমতা নিশ্চিত থাকে। Kafka-তে Avro ব্যবহারের মাধ্যমে ডেটার গঠন নির্ধারণ করা এবং তার স্ট্রাকচারাল অখণ্ডতা নিশ্চিত করা সহজ হয়।

Kafka Producer-এ Avro Serialization:

Kafka Producer-এ Avro Serialization ব্যবহার করতে হলে কিছু নির্দিষ্ট লাইব্রেরি এবং কনফিগারেশন প্রয়োজন:

  1. Avro Dependency: প্রথমে Avro সংক্রান্ত ডিপেনডেন্সি আপনার প্রজেক্টে অন্তর্ভুক্ত করতে হবে।
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.10.2</version>
</dependency>

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>7.x.x</version>
</dependency>
  1. Avro Schema File: একটি Avro schema ফাইল তৈরি করুন, যেখানে আপনি ডেটার গঠন সংজ্ঞায়িত করবেন। উদাহরণস্বরূপ:

user.avsc (Avro Schema Example):

{
  "type": "record",
  "name": "User",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "age",
      "type": "int"
    }
  ]
}

এটি একটি User নামক রেকর্ড তৈরি করে, যার মধ্যে দুটি ফিল্ড রয়েছে: name (string টাইপ) এবং age (int টাইপ)।

  1. Producer Configuration: Kafka producer কনফিগারেশনে Avro serializer ব্যবহার করতে হবে। নিচের কনফিগারেশন উদাহরণ:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");  // Schema Registry URL
  1. Producer Implementation: Kafka Producer কোডে Avro ডেটা সেরিয়ালাইজ করার উদাহরণ:
public class AvroProducer {
    public static void main(String[] args) throws Exception {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
        properties.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
        properties.put("schema.registry.url", "http://localhost:8081");

        KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(properties);

        String topic = "user-topic";
        
        // Load schema
        Schema schema = new Schema.Parser().parse(new File("user.avsc"));
        
        // Create a GenericRecord
        GenericRecord user = new GenericData.Record(schema);
        user.put("name", "John Doe");
        user.put("age", 30);
        
        // Produce the record
        ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topic, "key", user);
        producer.send(record);
        
        producer.close();
    }
}

Avro Data Consumer-এ Deserialization

Kafka Consumer-এ Avro ডেটা ডিসিরিয়ালাইজ করতে নিম্নলিখিত পদক্ষেপ অনুসরণ করা হয়:

  1. Consumer Configuration: Kafka Consumer কনফিগারেশনে Avro Deserializer ব্যবহার করতে হবে।
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-group");
props.put("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
  1. Consumer Implementation: Kafka Consumer এর মাধ্যমে Avro ডেটা ডিসিরিয়ালাইজ করার উদাহরণ:
public class AvroConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("group.id", "consumer-group");
        properties.put("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        properties.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        properties.put("schema.registry.url", "http://localhost:8081");

        KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList("user-topic"));

        while (true) {
            ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, GenericRecord> record : records) {
                GenericRecord user = record.value();
                System.out.println("Received user: " + user.get("name") + ", Age: " + user.get("age"));
            }
        }
    }
}

এখানে, Consumer এর মাধ্যমে Avro ডেটা ডিসিরিয়ালাইজ করে GenericRecord অবজেক্টে প্রসেস করা হচ্ছে।


Avro এবং Kafka Schema Registry

Avro ফাইলগুলি যখন Kafka টপিকে প্রেরণ করা হয়, তখন Kafka Schema Registry ব্যবহৃত হয়, যা schemas সেন্ট্রালাইজডভাবে স্টোর এবং ম্যানেজ করে। এটি Kafka Producer এবং Consumer এর মধ্যে schema incompatibilities দূর করতে সাহায্য করে।

Schema Registry হল একটি সার্ভিস যা schema গুলো সংরক্ষণ এবং ভার্সনিং সাপোর্ট করে। যখনই schema আপডেট হয়, schema registry নিশ্চিত করে যে producer এবং consumer উভয় পক্ষই সঠিক schema ব্যবহার করছে।


সারাংশ

Kafka তে Avro এর মাধ্যমে ডেটা সেরিয়ালাইজেশন একটি শক্তিশালী এবং কার্যকরী পদ্ধতি, যা ডেটা কম্প্যাক্টনেস, schema-based validation, এবং efficient serialization নিশ্চিত করে। এটি ডেটা স্ট্রিমিং এবং প্রক্রিয়াকরণে schema বেসড অ্যাপ্রোচ ব্যবহার করতে সাহায্য করে এবং Kafka Producer এবং Consumer এর মধ্যে নির্ভরযোগ্য ডেটা ট্রান্সফার সমর্থন করে। Kafka-র সাথে Avro ব্যবহারের মাধ্যমে ডেটা গঠন সহজে ম্যানেজ করা যায় এবং সিস্টেমে নমনীয়তা এবং স্কেলেবিলিটি আসে।

Content added By

Schema Evolution এবং Schema Compatibility অ্যাপাচি কাফকা (Apache Kafka) ব্যবহার করার সময় একটি গুরুত্বপূর্ণ বিষয়, বিশেষত যখন কাফকা টপিকের ডেটা ফরম্যাট পরিবর্তন হয় বা ভিন্ন অ্যাপ্লিকেশন দ্বারা ব্যবহার করা হয়। কাফকা ডেটার বিনিময়ে schema ব্যবহৃত হলে, ডেটার কাঠামো (structure) সঠিকভাবে সংজ্ঞায়িত করা প্রয়োজন। এক্ষেত্রে schema সংস্করণ (versioning) এবং এর মধ্যে সামঞ্জস্য (compatibility) নিশ্চিত করা অত্যন্ত গুরুত্বপূর্ণ।


Schema Evolution কী?

Schema Evolution হলো একটি প্রক্রিয়া, যার মাধ্যমে ডেটার কাঠামো সময়ের সাথে পরিবর্তিত হতে পারে, এবং নতুন স্কিমা পুরোনো স্কিমার সাথে সামঞ্জস্যপূর্ণ থাকে। এটি নিশ্চিত করে যে, ডেটা ফরম্যাটে কোনো পরিবর্তন করা হলেও, পূর্ববর্তী স্কিমা সহ ডেটা সরবরাহকারী অ্যাপ্লিকেশনগুলি এখনও সঠিকভাবে কাজ করবে। এই প্রক্রিয়াটি সাধারণত অ্যাপাচি অ্যাভ্রো (Apache Avro), JSON, বা পার্কেট (Parquet) ফরম্যাটে ব্যবহৃত স্কিমা ব্যবস্থাপনা নিয়ে কাজ করে।

Schema Evolution এর গুরুত্বপূর্ণ দিক:

  1. স্কিমার অবিচ্ছিন্নতা: একটি সিস্টেমে স্কিমা পরিবর্তিত হলেও পুরোনো ডেটা বা আগের স্কিমা অনুযায়ী ডেটা আপডেট হওয়ার দরকার হয় না। এতে নিশ্চিত হয় যে, নতুন এবং পুরোনো স্কিমার মধ্যে সামঞ্জস্য রক্ষিত থাকে।
  2. ডেটা ফরম্যাটের সংজ্ঞা: যখন ডেটা প্রেরণ করা হয় (যেমন, কাফকা টপিকে), স্কিমা ফরম্যাটের পরিবর্তন সঠিকভাবে পরিচালনা করতে হবে, যাতে ডেটার অর্থ ঠিক থাকে এবং প্রক্রিয়ায় কোনো ত্রুটি না ঘটে।

Schema Compatibility কী?

Schema Compatibility হল একটি ধারণা, যা স্কিমা পরিবর্তনের সময় পুরোনো স্কিমার সাথে নতুন স্কিমার সামঞ্জস্যপূর্ণ কিনা তা নির্ধারণ করে। এটি ডেটা গ্রহণকারী (consumer) এবং প্রেরণকারী (producer) উভয়ের জন্য গুরুত্বপূর্ণ, কারণ কোনো স্কিমার পরিবর্তনে যদি অসম্পূর্ণ বা অনুপযুক্ত ডেটা চলে আসে, তাহলে তা ব্যবস্থাপনায় সমস্যা সৃষ্টি করতে পারে।

Schema Compatibility এর ধরন:

  1. ব্যাকওয়ার্ড কম্প্যাটিবিলিটি (Backward Compatibility):
    • নতুন স্কিমা পুরোনো স্কিমার সাথে সামঞ্জস্যপূর্ণ থাকে। অর্থাৎ, পুরোনো স্কিমা দ্বারা লেখা ডেটা নতুন স্কিমা দ্বারা পড়া সম্ভব।
    • উদাহরণস্বরূপ, যদি একটি প্রডিউসার পুরোনো স্কিমা ব্যবহার করে এবং একটি কনজিউমার নতুন স্কিমা ব্যবহার করে, তাহলে কনজিউমার পুরোনো ডেটা সঠিকভাবে পড়তে পারবে।
  2. ফরওয়ার্ড কম্প্যাটিবিলিটি (Forward Compatibility):
    • পুরোনো স্কিমা নতুন স্কিমা দ্বারা সামঞ্জস্যপূর্ণ থাকে। অর্থাৎ, নতুন স্কিমা দ্বারা লেখা ডেটা পুরোনো স্কিমা দ্বারা পড়া সম্ভব।
    • এটি নিশ্চিত করে যে, যদি ভবিষ্যতে নতুন স্কিমা ব্যবহার করা হয়, তবে পুরোনো সিস্টেমে ডেটা পাঠানো বা গ্রহণ করা যাবে।
  3. দ্বিদ্বৈত কম্প্যাটিবিলিটি (Full Compatibility):
    • এটি ব্যাকওয়ার্ড এবং ফরওয়ার্ড কম্প্যাটিবিলিটির সংমিশ্রণ। অর্থাৎ, পুরোনো এবং নতুন স্কিমার মধ্যে ডেটা প্রক্রিয়া একে অপরের সাথে কাজ করতে পারে।

Schema Registry ব্যবহার

Schema Registry হল একটি কেন্দ্রীয় সিস্টেম, যা কাফকা টপিকের স্কিমা সংরক্ষণ এবং পরিচালনা করে। এটি কাফকা ডেটা স্ট্রিমের জন্য স্কিমা সংরক্ষণ, প্রযোজ্যতা এবং সংস্করণ ব্যবস্থাপনা সহজ করে তোলে। Schema Registry এর মাধ্যমে আপনি কাফকা প্রডিউসার এবং কনজিউমারদের মধ্যে স্কিমা পরিবর্তন এবং স্কিমার সামঞ্জস্য নিশ্চিত করতে পারেন।

Schema Registry এর কাজ:

  • স্কিমা সংরক্ষণ: Schema Registry তে সমস্ত স্কিমা সংরক্ষিত থাকে। যখন নতুন স্কিমা তৈরি হয়, তখন এটি সিস্টেমে নিবন্ধিত হয়।
  • স্কিমা ভ্যালিডেশন: প্রতিটি প্রডিউসার যখন নতুন ডেটা পাঠায়, Schema Registry নিশ্চিত করে যে পাঠানো ডেটা সঠিক স্কিমা অনুসরণ করছে।
  • স্কিমা সংস্করণিং: Schema Registry স্বয়ংক্রিয়ভাবে স্কিমার সংস্করণ তৈরি করে, এবং বিভিন্ন সংস্করণের স্কিমা একই সময়ে ব্যবহৃত হতে পারে।

Schema Evolution এবং Compatibility-র জন্য Best Practices

  1. স্কিমা ভার্সনিং:
    • প্রতিটি স্কিমা পরিবর্তন করার সময় একটি নতুন সংস্করণ তৈরি করুন। এভাবে, পুরোনো এবং নতুন সংস্করণের মধ্যে সংযোগ রাখা সম্ভব হয়।
    • স্কিমা পরিবর্তন করার সময়, ব্যাকওয়ার্ড বা ফরওয়ার্ড কম্প্যাটিবিলিটি নিশ্চিত করুন, যাতে পুরোনো এবং নতুন স্কিমার মধ্যে সামঞ্জস্য থাকে।
  2. স্কিমার বৈধতা চেক করা:
    • প্রতিটি প্রডিউসার ডেটা পাঠানোর আগে স্কিমা রেজিস্ট্রিতে স্কিমার বৈধতা পরীক্ষা করা উচিত। এতে নিশ্চিত করা যাবে যে পাঠানো ডেটা সঠিক কাঠামো অনুসরণ করছে।
  3. সুন্দর স্কিমা ডিজাইন:
    • স্কিমার পরিবর্তন হলে তা যেন সহজভাবে একে অপরের সাথে সামঞ্জস্যপূর্ণ থাকে, সেজন্য স্কিমা ডিজাইন করার সময় এটি মাথায় রাখুন। যেমন, ডেটার প্রয়োজনীয় ফিল্ডগুলো রাখা এবং অপরিহার্য ফিল্ডে কোনো পরিবর্তন না আনা।
  4. স্কিমা কনভার্টার ব্যবহার:
    • যদি আপনার ডেটা বিভিন্ন ধরনের ফরম্যাটে থাকে (যেমন, JSON, Avro, Protobuf), তবে স্কিমা কনভার্টার ব্যবহার করে বিভিন্ন স্কিমা ফরম্যাটের মধ্যে কনভার্সন নিশ্চিত করুন।

সারাংশ

Schema Evolution এবং Schema Compatibility কাফকা ডেটার দীর্ঘমেয়াদী ব্যবহারের জন্য গুরুত্বপূর্ণ। ডেটার কাঠামোতে পরিবর্তন করতে গেলে এটি নিশ্চিত করতে হবে যে, পুরোনো ডেটা এবং নতুন ডেটা উভয়ই সঠিকভাবে প্রসেস করা যায়। Schema Registry এই প্রক্রিয়া সহজ করে তোলে এবং স্কিমা সংস্করণ এবং বৈধতা চেকের মাধ্যমে ডেটার সামঞ্জস্য বজায় রাখতে সহায়তা করে। সঠিকভাবে স্কিমা ডিজাইন ও ব্যবহার করলে ডিস্ট্রিবিউটেড সিস্টেমে ডেটার অখণ্ডতা এবং নির্ভরযোগ্যতা বজায় রাখা সম্ভব।

Content added By

ডেটা প্রক্রিয়াকরণে সঠিক ডেটার গঠন বা schema বজায় রাখা অত্যন্ত গুরুত্বপূর্ণ। কাফকা ব্যবহারকারীরা যখন বিভিন্ন ধরনের ডেটা স্ট্রিম পাঠান, তখন ডেটার গঠন (যেমন JSON, Avro, বা Protobuf) সঠিকভাবে ফলো করা নিশ্চিত করতে Schema Registry ব্যবহার করা হয়। Schema Registry অ্যাপাচি কাফকা ইকোসিস্টেমের একটি অপরিহার্য অংশ, যা ডেটার গঠন বা schema সংরক্ষণ এবং যাচাই করার কাজ করে।

এই পর্বে আমরা জানবো কীভাবে Schema Registry দিয়ে কাফকা মেসেজগুলোর ডেটা ভ্যালিডেশন করা যায় এবং এর মাধ্যমে ডেটার সঠিক গঠন নিশ্চিত করা যায়।


Schema Registry কী?

Schema Registry একটি সার্ভিস, যা Apache Avro বা অন্যান্য সিরিয়ালাইজেশন ফরম্যাটের জন্য স্কিমা (schema) সংরক্ষণ এবং পরিচালনা করে। এটি একটি centralized repository হিসেবে কাজ করে, যেখানে কাফকা টপিকের জন্য ব্যবহৃত স্কিমাগুলি সংরক্ষিত থাকে। এতে ডেটার কাঠামো (structure) সঠিক কিনা তা যাচাই করার জন্য schema validation কার্যক্রম চালানো হয়।

Schema Registry এর উদ্দেশ্য:

  • Schema Evolution: স্কিমা পরিবর্তন করার জন্য কাফকা মেসেজগুলির সাথে সামঞ্জস্য রাখা।
  • Data Integrity: ডেটার সঠিক গঠন নিশ্চিত করা এবং ডুপ্লিকেট ডেটা বা ভুল ফরম্যাট ব্লক করা।
  • Version Control: স্কিমার বিভিন্ন সংস্করণ সংরক্ষণ করা এবং সঠিক সংস্করণ ব্যবহার নিশ্চিত করা।

Schema Registry এর কাজ কিভাবে হয়?

Schema Registry একটি RESTful API প্রদান করে, যার মাধ্যমে ব্যবহারকারীরা স্কিমা রেজিস্টার করতে, স্কিমার সংস্করণ পরিচালনা করতে এবং যাচাই করতে পারেন। কাফকা মেসেজ প্রেরণ করার সময়, প্রযোজক (producer) এবং গ্রাহক (consumer) উভয়কেই সঠিক স্কিমা অনুসরণ করতে হয়। সঠিক স্কিমা মেনে না চললে, ডেটা পাঠানো বা গ্রহণ করা সম্ভব হয় না।

Schema Registry এর প্রধান কার্যাবলী:

  1. Schema Registration: প্রযোজকরা যখন নতুন স্কিমা তৈরি করেন, তখন এটি Schema Registry তে নিবন্ধিত হয়।
  2. Schema Validation: ডেটা পাঠানোর আগে, স্কিমা যাচাই করা হয় যাতে এটি স্কিমা রেজিস্ট্রিতে সংরক্ষিত স্কিমার সাথে মিলে।
  3. Schema Evolution: স্কিমা পরিবর্তন হলে, পুরনো ডেটার সাথে সামঞ্জস্য বজায় রাখতে Schema Registry সংস্করণ ব্যবস্থাপনা করে। এটি নতুন স্কিমা সংস্করণকে পুরনো সংস্করণের সাথে সামঞ্জস্যপূর্ণ রাখার জন্য বৈধতা পরীক্ষা করে।

Schema Registry Integration এর মাধ্যমে Data Validation

Data Validation একাধিক স্তরে কাজ করে, যা নিশ্চিত করে যে শুধুমাত্র সঠিক এবং নির্ধারিত কাঠামো অনুসরণকারী ডেটাই কাফকা টপিকের মাধ্যমে প্রবাহিত হবে। এ জন্য Schema Registry ব্যবহার করে ডেটা যাচাই করা হয়।

১. Producer Side Validation

প্রযোজক (Producer) ডেটা প্রেরণের আগে, Schema Registry তে সংরক্ষিত স্কিমার সাথে ডেটার গঠন যাচাই করে। যদি প্রযোজক ডেটা প্রেরণের জন্য ভুল স্কিমা ব্যবহার করে, তাহলে স্কিমা রেজিস্ট্রির মাধ্যমে একটি ত্রুটি (error) প্রাপ্ত হবে এবং ডেটা প্রেরণ ব্লক হবে।

উদাহরণস্বরূপ, যখন একটি প্রযোজক অ্যাভ্রো (Avro) ফরম্যাটে ডেটা পাঠায়, তখন এই ফরম্যাটের সাথে সংশ্লিষ্ট স্কিমাটি Schema Registry থেকে যাচাই করা হয়। যদি ডেটা স্কিমার সাথে মিল না খায়, তবে এটি স্বীকৃত হবে না এবং একটি ত্রুটি (error) হবে।

ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topic, key, value);
producer.send(record);

যতক্ষণ না ডেটার স্কিমা Schema Registry এর সাথে মিলে, ততক্ষণ ডেটা পাঠানো সম্ভব হবে না।

২. Consumer Side Validation

কনসিউমার (Consumer) ডেটা গ্রহণ করার সময়, এটি স্কিমা রেজিস্ট্রির সাথে সংযুক্ত স্কিমার যাচাই করে। কনসিউমারের মধ্যে কোনো অসামঞ্জস্য হলে, ডেটা গ্রহণ করা যাবে না। এটি নিশ্চিত করে যে কনসিউমার শুধুমাত্র সঠিক স্কিমা অনুসরণকারী ডেটা গ্রহণ করছে।

৩. Schema Compatibility Check

Schema Registry স্কিমার সামঞ্জস্য বা compatibility যাচাই করার জন্য দুটি প্রধান পদ্ধতি ব্যবহার করে:

  • Backward Compatibility: নতুন স্কিমা পুরনো স্কিমার সাথে সঙ্গতিপূর্ণ কিনা তা যাচাই করা। এতে আগের মেসেজগুলো নতুন স্কিমার দ্বারা পাঠানো ডেটার সাথে সুসংগত থাকবে।
  • Forward Compatibility: পুরনো স্কিমা নতুন স্কিমার সাথে সঙ্গতিপূর্ণ কিনা তা যাচাই করা। এর মাধ্যমে পুরনো মেসেজগুলো নতুন স্কিমার দ্বারা প্রক্রিয়া করা সম্ভব হবে।

এভাবে, Schema Registry স্ট্রিম ডেটার কাঠামো নিশ্চিত করে এবং সামঞ্জস্যপূর্ণ স্কিমা বজায় রাখে।


Schema Registry ব্যবহার করে Data Validation এর সুবিধা

  1. Data Consistency: ডেটা প্রক্রিয়াকরণের প্রতিটি পর্যায়ে কাঠামোর সঠিকতা নিশ্চিত করা, যা ডেটা অখণ্ডতা এবং ধারাবাহিকতা বজায় রাখে।
  2. Error Prevention: ভুল বা অসম্পূর্ণ ডেটা স্কিমা ব্লক করা, যার ফলে সিস্টেমে ত্রুটি সৃষ্টি হওয়া রোধ করা হয়।
  3. Schema Evolution Management: যখন স্কিমার আপডেট বা পরিবর্তন হয়, তখন নতুন স্কিমা এবং পুরনো স্কিমার মধ্যে সামঞ্জস্য বজায় রাখা যায়।
  4. Enhanced Data Quality: কেবলমাত্র সঠিক কাঠামো অনুসরণকারী ডেটা প্রক্রিয়া করা হয়, ফলে ডেটার গুণগত মান বজায় থাকে।
  5. Reduced Complexity: প্রযোজক এবং কনসিউমারের জন্য স্কিমা ব্যবস্থাপনা সহজ করে, কারণ স্কিমার পরিবর্তন বা আপডেটের সময় এতে কোনো ঝামেলা থাকে না।

সারাংশ

Schema Registry কাফকা ইকোসিস্টেমের গুরুত্বপূর্ণ অংশ, যা ডেটার গঠন (schema) সংরক্ষণ, যাচাই এবং সংস্করণ ব্যবস্থাপনা নিশ্চিত করে। এটি data validation প্রক্রিয়াকে সহজতর করে এবং ডেটার সঠিকতা ও ধারাবাহিকতা বজায় রাখে। প্রযোজকরা যখন ডেটা পাঠান, তখন স্কিমা রেজিস্ট্রির মাধ্যমে স্কিমার সাথে মিলিয়ে যাচাই করা হয় এবং কনসিউমাররা স্কিমার ভিত্তিতে ডেটা গ্রহণ করে। স্কিমা ইভোলিউশন এবং সামঞ্জস্য নিশ্চিত করার মাধ্যমে Schema Registry কাফকা সিস্টেমে ডেটা প্রক্রিয়া করার গুণগত মান ও নির্ভরযোগ্যতা বাড়ায়।

Content added By
Promotion

Are you sure to start over?

Loading...