Mastering Real-Time Data Pipelines: Implementing CDC with Strimzi Kafka Connect from PostgreSQL to MinIO — Part-3
In the previous article, we started by creating a Source Kafka Connect Cluster to ingest data from a PostgreSQL database and load it into Kafka. The next step is to move the ingested data from Kafka to MinIO in Apache Parquet format, preparing it for data analysis. If you’re not familiar with the architecture, I highly recommend starting with the first article, where I provided a brief overview of the entire architecture of a Kafka Connect Cluster in a Kubernetes environment. The topics covered in this article are listed below.
- What is the Schema Registry ?
- How to create Schema Registry ?
- How to add record into Schema registry ?
- Create Sink Kafka Connect Cluster
- Create Kafka Connector Plugin to sink data from Kafka to MinIO
What is the Schema Registry ?
Schema Registry is a service that manages and enforces data schemas for Kafka topics. It provides a central repository for schema definitions and ensures that the data being written to and read from Kafka topics adheres to these schemas.
Why Do We Need Schema Registry ?
- Consistency: Ensures data produced and consumed in Kafka follows a consistent format, reducing errors caused by schema mismatches.
- Compatibility: Manages schema evolution by enforcing compatibility rules, allowing schemas to evolve without breaking existing consumers.
- Centralized Management: Stores and manages all data schemas in one place, making it easier to enforce data quality and governance.
How It Works ?
- Producer Side: When a producer writes data to a Kafka topic, it registers the schema with the Schema Registry and includes a schema ID with the data. This ensures the data being written adheres to the expected format.
- Consumer Side: When a consumer reads data from Kafka, it retrieves the schema ID and fetches the corresponding schema from the Schema Registry to deserialize the data correctly. This ensures the consumer can interpret the data as intended.
Plugins and Schema Registry
Some Kafka Connect plugins and other tools require schemas for their operations. For instance:
- Kafka Connect: Many connectors, especially those dealing with structured data formats (e.g., Avro, JSON Schema), rely on Schema Registry to validate and manage data schemas.
- Stream Processing Frameworks: Tools like Apache Flink or Kafka Streams can use Schema Registry to ensure the data they process is well-structured and adheres to expected formats.
How to create Schema Registry
To install Schema Registry in your Kubernetes cluster using Helm, follow these steps:
1-) Add Bitnami Helm Repository
helm repo add bitnami https://charts.bitnami.com/bitnami
2-) Use the “helm install” command to deploy Schema Registry with the necessary parameters. Replace “my-cluster-kafka-bootstrap:9092” with your actual Kafka bootstrap server address if different.
helm install schema-registry oci://registry-1.docker.io/bitnamicharts/schema-registry --namespace kafka --set kafka.bootstrapServers="my-cluster-kafka-bootstrap:9092" --set service.type=NodePort
3-) After running the command, verify that the Schema Registry has been successfully installed and is running:
kubectl get pods -n kafka
How to Add a Record Into Schema Registry
Adding a record to Schema Registry involves a few straightforward steps. Below, I’ll guide you through the process, from installing the necessary tools to sending a schema to the Schema Registry.
1-) Install “jq”
sudo apt install jq
2-) Create Schema File
Create a JSON schema file that defines the structure of the data you want to store. Here’s an example of what the schema file (schema.avsc) might look like:
{
"type": "record",
"name": "FinancialTransaction",
"namespace": "com.ihk", // The namespace groups related schemas, helping in schema organization.
"fields": [
{
"name": "transaction_id",
"type": "int"
},
{
"name": "account_id",
"type": "int"
},
{
"name": "transaction_type",
"type": "string"
},
{
"name": "amount",
"type": "bytes",
"logicalType": "decimal",
"precision": 10,
"scale": 2
},
{
"name": "transaction_date",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
},
{
"name": "description",
"type": ["null", "string"],
"default": null
},
{
"name": "status",
"type": "string"
}
]
}
3-) Prepare the Schema for Submission
SCHEMA=$(cat schema.avsc | jq -Rs .)
4-) Send the Schema to Schema Registry
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data "{\"schema\": $SCHEMA}" \
http:schemaRegistry:schemaRegistryPort/subjects/ihk.public.financial_transactions/versions
Subject Name:
- The subject name is used to uniquely identify a schema within the Schema Registry. It is typically related to the Kafka topic name to provide a logical association between the data in Kafka and its schema.
- In this example, “ihk.public.financial_transactions” suggests that the schema is associated with a Kafka topic named “ihk.public.financial_transactions”.
Create Sink Kafka Connect Cluster
Now, let’s continue by creating the Sink Kafka Connect Cluster using the manifest below. You can configure it based on your specific use case.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: minio-connect-cluster
namespace: kafka
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.7.0
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9093
tls:
trustedCertificates:
- secretName: my-cluster-cluster-ca-cert
certificate: ca.crt
config:
group.id: minio-connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
#key.converter: io.confluent.connect.avro.AvroConverter
#value.converter: io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url: http://schema-registry:8081
value.converter.schema.registry.url: http://schema-registry:8081
externalConfiguration:
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: awscred
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: awscred
key: AWS_SECRET_ACCESS_KEY
# Include required connector plugins
build:
output:
type: docker
image: yourRegistry/minio-sink:v1
pushSecret: harbor-token
plugins:
- name: minio-sink
artifacts:
- type: zip
url: https://api.hub.confluent.io/api/plugins/confluentinc/kafka-connect-s3/versions/10.5.9/archive
The Kafka Connect cluster configuration includes secrets for accessing MinIO (or Amazon S3). These secrets are essential for securely connecting to the storage service.
kubectl create secret generic awscred --from-literal=AWS_ACCESS_KEY_ID=youAccessKey --from-literal=AWS_SECRET_ACCESS_KEY=yourSecretKey --namespace=kafka
Since I’ve already explained the parameters in the previous article I am skipping this step.
After applying the manifest you should see the KafkaConnect resource successfully deployed and it’s status is ready like below.
Create Kafka Connector Plugin to sink data from Kafka to MinIO
Having successfully deployed our sink cluster, we can now proceed with deploying the KafkaConnector resource to initialize the connector.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: minio-sink
namespace: kafka
labels:
strimzi.io/cluster: minio-connect-cluster
spec:
class: io.confluent.connect.s3.S3SinkConnector
tasksMax: 2
config:
s3.bucket.name: kafka-cdc-poc
s3.region: us-east-1
store.url: 'yourObjectStorageAddress'
format.class: io.confluent.connect.s3.format.parquet.ParquetFormat
flush.size: 3
topics: ihk.public.financial_transactions
connector.class: 'io.confluent.connect.s3.S3SinkConnector'
behavior.on.null.values: ignore
key.converter.schema.registry.url: http://schema-registry:8081
value.converter.schema.registry.url: http://schema-registry:8081
storage.class: io.confluent.connect.s3.storage.S3Storage
Lets explain some important config parameters.
“.flush.size” : The number of records to accumulate before flushing them to the storage
“.connector.class” : The class for the connector being used
“.behavior.on.null.values” : pecifies how the connector should behave when it encounters null values. “ignore” means it will skip the record
“.storage.class” : The storage class to use for storing data. For S3, it specifies the S3 storage class
“.partitioner.class” : The class for partitioning data in the storage. Different partitioning strategies can be applied, such as time-based or field-based partitioning.
“.schema.compatibility” : Defines the compatibility mode for schemas (e.g., backward, forward, full, none).
After deploying the manifest you should see the KafkaConnector resource successfully deployed and its status is ready like below.
Congratulations! Your sink pipeline is now ready. Whenever data changes in the PostgreSQL database, these changes are loaded into Kafka and then consumed by another KafkaConnector resource, which sinks the data into MinIO in Apache Parquet format. Now, let’s verify the data on the MinIO side as well.
As you can see, the files are sunk into MinIO under the path “kafka-cdc-poc/topics/ihk.public.financial_transactions/partition=0/”.