Mastering Real-Time Data Pipelines: Implementing CDC with Strimzi Kafka Connect from PostgreSQL to MinIO — Part-2

Ibrahim Halil Koyuncu
6 min readJun 29, 2024

--

In the previous article, I provided theoretical information about the Change Data Capture (CDC) structure and the Strimzi Kafka Connect Cluster architecture. In this article, we will focus on the practical implementation of creating a Source Kafka Connect Cluster and configuring it to build a data ingestion pipeline from PostgreSQL to Kafka. I highly recommend reading the previous article before diving into this part. However, if you’re already familiar with the architecture, feel free to continue from here. The topics covered in this article are listed below.

  • Setting Up the Environment
  • Create Source Kafka Connect Cluster and Build Connector Plugin Image
  • Initialize Kafka Connector Plugin
  • Test the CDC pipeline

Setting Up the Environment

Create Kafka Strimzi Cluster Operator and Kafka Cluster

To install Strimzi Kafka Operator follow instruction in below.

kubectl create namespace kafka # create namespace for kafka resources
kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka # apply Strimzi crds
kubectl get pod -n kafka --watch # check Strimzi cluster operator

For this example, I will create a single-node Kafka cluster. You can customize your cluster based on your specific configuration needs. If there’s interest, I might later write a new article detailing how to create a Kafka cluster with a high availability (HA) setup ;)

kubectl apply -f https://strimzi.io/examples/latest/kafka/kraft/kafka-single-node.yaml -n kafka

You should see output like below. Your PVC size can be different from mine.

Kafka namespace resources

Create Source Kafka Connect Cluster and Build Connector Plugin Image

Our Kafka cluster is now ready! Next, we’ll create a Source Kafka Connect Cluster using the manifest below. I’ve explained each parameter so you can configure them based on your use case.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: debezium-cdc
namespace: kafka
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.7.0
bootstrapServers: my-cluster-kafka-bootstrap:9093
replicas: 1
tls:
trustedCertificates:
- secretName: my-cluster-cluster-ca-cert
certificate: ca.crt
# - secretName: harbor-cert
# certificate: harbor.crt
config:
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
# -1 means it will use the default replication factor configured in the broker
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
resources:
requests:
memory: 2Gi
cpu: 1
limits:
memory: 4Gi
cpu: 2
livenessProbe:
initialDelaySeconds: 30
timeoutSeconds: 5
readinessProbe:
initialDelaySeconds: 30
timeoutSeconds: 5
build:
output:
type: docker
image: your-registry/debezium-cdc:v1
pushSecret: harbor-token
#additionalKanikoOptions: [--insecure-registry,--skip-tls-verify]
plugins:
- name: postgres-connector
artifacts:
- type: tgz
url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.5.1.Final/debezium-connector-postgres-2.5.1.Final-plugin.tar.gz
sha512sum: 0d71db078e1868b0e312b4979e78bef4a01471ea1d7284000423d9e4fddbfde6dd1a30cd0908e75155843d0ac3a5f0a151f40e7ef67e9f2fc57af278b4b65cc8

— Annotations

strimzi.io/use-connector-resources: “true” ” : Configures this KafkaConnect to use KafkaConnector resources, avoiding the need to call the Connect REST API directly.

— Spec Parameters

“.version” : Specifies the version of Kafka to be used.

.replicas” : Number of Kafka Connect worker pods to be deployed.

.bootstrapServers” : Defines the address of the Kafka bootstrap servers.

  • “.tls.trustedCertificates” :
  • secretName: The name of the Kubernetes secret containing the CA certificate.
  • certificate: The name of the certificate file within the secret.

If you are using private registry like Harbor, you need to create a secret with CA file of your registry and add the secret information under the .tls.trustedCertificates parameters

— Config Parameters

“.group.id” : The consumer group ID for Kafka Connect workers

“.offset.storage.topic” : Topic to store connector offsets

“.config.storage.topic” : Topic to store connector configurations

“.status.storage.topic” : Topic to store connector status updates

“.config.storage.replication.factor” : Replication factor for the config storage topic (-1 uses the default broker configuration)

“.offset.storage.replication.factor” : Replication factor for the offset storage topic

“.status.storage.replication.factor” : Replication factor for the status storage topic

— Resources Parameters

Resource requests and limits for the Connect workers

— Liveness and Readiness Probes

“.initialDelaySeconds” : Delay before the liveness and readines probe is initiated.

“.timeoutSeconds” : Timeout for the liveness and readiness probe.

— Build Connector Plugin Image Parameters(Optional) : If you would like to create your custom connector image you can use this parameters structure.

“.output.type” : Specifies the build output type (e.g., docker)

“.output.image” : Docker image name to be used

“.output.pushSecret” : Secret for pushing the image to the registry

“plugins.name” : Name of the plugin

“.plugins.artifacts” : It’s type array so you can add multiple artifacts based on your use cases.( Such as adding artifacts for your each database type PostgreSQL, MSSql ..)

“.artifacts.type” : Type of the plugin artifact (e.g., tgz)

“.artifacts.url” : URL to download the plugin artifact

“.artifacts.sha512sum” : SHA-512 checksum to verify the integrity of the downloaded plugin

After applying the manifest, you can check the status using the command below. If everything is configured correctly, you should see the Kafka Connect Cluster status as ready.

Kafka Connect Cluster Output

Now lets continue with initialize the Kafka Connector instance inside the Kafka Connect Cluster.

Initialize Kafka Connector Plugin

A KafkaConnector resource defines and manages the configuration of a Kafka Connect connector. It allows users to deploy and manage connectors, providing a seamless and integrated approach to handle data streaming tasks within a Kafka Connect cluster. Through this resource, you can specify parameters for external data systems, such as authentication, converters, and transformers. Let’s take a closer look at it :)

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: postgres-connector
namespace: kafka
labels:
strimzi.io/cluster: debezium-cdc
spec:
class: io.debezium.connector.postgresql.PostgresConnector
tasksMax: 2
config:
topic.prefix: "ihk"
connector.class: "io.debezium.connector.postgresql.PostgresConnector"
database.hostname: postgresqlHostname
database.port: "5432"
database.user: "postgres"
plugin.name: "pgoutput"
database.password: "postgres"
database.dbname: "postgres"
database.server.name: "postgres"
value.converter: "org.apache.kafka.connect.json.JsonConverter"

— Annotations

“strimzi.io/cluster” : Identifies the KafkaConnect instance in which to create this connector. The KafkaConnect instance must have the strimzi.io/use-connector-resources annotation set to true

— Spec Parameters

“.class” : The fully qualified class name of the connector to use

“.taskMax” : The maximum number of tasks the connector can create for parallel processing

— Config Parameters

“.topic.prefix” : Prefix for Kafka topics where data changes will be published

“.connector.class” : Specifies the connector class to be used (same as class field)

“.database.hostname” : Hostname of the PostgreSQL database to connect to

“.database.port” : Port number of the PostgreSQL database

“.database.user” : Username for connecting to the PostgreSQL database

“.plugin.name” : The name of the plugin to use for capturing changes from PostgreSQL

“.database.password” : Password for connecting to the PostgreSQL database

“.database.dbname” : Name of the PostgreSQL database to connect to

“.database.server.name” : A logical name that uniquely identifies the database server and all recorded offsets

“.table.whitelist” : Limits data capture to the specified tables

“.value.converter” : Specifies the converter to use for the data values, in this case, JSON.

“.key.converter” : Converter to use for the keys in the Kafka records

After modifying KafkaConnector resource based on your use case apply it and you should see it’s successfully deployed like in below.

List Kafka Connector Resource

We have now completed the ingestion steps. When data is added or changed in our database, it will be tracked by our connectors and moved to Kafka. You can collect data from many databases by creating additional KafkaConnector resources, but remember to configure your KafkaConnect cluster accordingly :)

Test the CDC Pipeline

Our test case involves connecting to one of the broker pods and executing the relevant “.sh” file to consume data from the Kafka topic. This allows us to verify that the data has been correctly transferred from PostgreSQL to the Kafka topic.

kubectl get pods -n <namespace> -l app.kubernetes.io/name=kafka

kubectl exec -it kafka-broker-pod-name -n kafka-namespace -- /bin/bash

bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic topicName --from-beginning

--

--

Ibrahim Halil Koyuncu
Ibrahim Halil Koyuncu

No responses yet