CDC with Debezium
What is Debezium?
Debezium is a distributed open source Change Data Capture (CDC) platform. It captures database changes and propagates them as events to Apache Kafka.
Supported databases: PostgreSQL, MySQL, MongoDB, SQL Server, Oracle, Db2, Cassandra.
PostgreSQL — CDC configuration
To enable CDC, PostgreSQL must be configured with wal_level=logical:
apiVersion: apps/v1
kind: Deployment
metadata:
name: cdc-postgresql
namespace: kafka-cdc
spec:
template:
spec:
containers:
- name: postgresql
image: registry.redhat.io/rhel9/postgresql-16:latest
args:
- "-c"
- "wal_level=logical"
- "-c"
- "max_replication_slots=4"
- "-c"
- "max_wal_senders=4"
Sample data (seed)
CREATE TABLE customers (
id SERIAL PRIMARY KEY,
first_name VARCHAR(100) NOT NULL,
last_name VARCHAR(100) NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
customer_id INTEGER REFERENCES customers(id),
product VARCHAR(255) NOT NULL,
amount DECIMAL(10,2) NOT NULL,
status VARCHAR(50) DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE PUBLICATION cdc_publication FOR TABLE customers, orders;
KafkaConnect with Debezium
The Debezium connector is deployed inside KafkaConnect using Strimzi’s build feature, which builds a custom image with the required plugins:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: cdc-connect
namespace: kafka-cdc
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: "4.0.0"
replicas: 2
bootstrapServers: cdc-cluster-kafka-bootstrap:9093
build:
output:
type: docker
image: image-registry.openshift-image-registry.svc:5000/kafka-cdc/cdc-connect:latest
plugins:
- name: debezium-postgresql
artifacts:
- type: maven
group: io.debezium
artifact: debezium-connector-postgres
version: 2.7.3.Final
- name: http-sink
artifacts:
- type: maven
group: io.aiven
artifact: http-connector-for-apache-kafka
version: 0.7.0
The strimzi.io/use-connector-resources: "true" annotation lets you manage connectors as KafkaConnector resources (GitOps-friendly).
|
KafkaConnector — Debezium source
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: debezium-postgresql-source
namespace: kafka-cdc
labels:
strimzi.io/cluster: cdc-connect
spec:
class: io.debezium.connector.postgresql.PostgresConnector
tasksMax: 1
config:
database.hostname: cdc-postgresql
database.port: "5432"
database.user: cdcuser (1)
database.password: cdc-demo-2024 (1)
database.dbname: cdcdb
topic.prefix: cdc
schema.include.list: public
table.include.list: "public.customers,public.orders"
plugin.name: pgoutput
publication.name: cdc_publication
slot.name: debezium_cdc
snapshot.mode: initial
| 1 | In the deployed Helm chart, credentials are injected from values.yaml via templates: {{ .Values.postgresql.username }} and {{ .Values.postgresql.password }}. The values shown here are the demo defaults. |
How it Works
PostgreSQL logical replication
Debezium does not run SQL queries to capture changes — it uses PostgreSQL’s native logical replication protocol:
-
PostgreSQL writes every transaction to the Write-Ahead Log (WAL) before committing to disk. With
wal_level=logical, the WAL contains enough information to decode row-level changes. -
Debezium creates a replication slot (
debezium_cdc) that acts as a persistent cursor in the WAL. PostgreSQL does not remove WAL segments the slot has not consumed yet — this guarantees zero event loss. -
The
pgoutputplugin decodes binary WAL changes into structured logical messages (table, columns, before/after values). -
The
PUBLICATION(cdc_publication) filters which tables are replicated — onlycustomersandordersgenerate events.
CDC event structure
Each Debezium event has this structure:
{
"schema": { ... },
"payload": {
"before": null,
"after": { "id": 1, "first_name": "Demo", "last_name": "User", "email": "demo@test.io" },
"source": {
"version": "2.7.3.Final",
"connector": "postgresql",
"db": "cdcdb",
"schema": "public",
"table": "customers",
"lsn": 234881048,
"txId": 550
},
"op": "c",
"ts_ms": 1710000000000
}
}
-
op: operation type —c(create/INSERT),u(update),d(delete),r(read/snapshot) -
before: previous row state (null on INSERT, populated on UPDATE/DELETE) -
after: new row state (null on DELETE) -
source.lsn: Log Sequence Number — exact position in the PostgreSQL WAL -
source.txId: transaction ID — correlates multiple changes from the same TX
KafkaConnect task distribution
KafkaConnect distributes work via tasks:
-
Each connector declares how many tasks it needs (
tasksMax). The Debezium source connector always uses 1 task (PostgreSQL limitation: one replication slot per connector). -
The HTTP sink can use multiple tasks — one per input topic partition.
-
If a Connect replica fails, the group coordinator reassigns tasks to the remaining replicas in ~30 seconds.
-
Each task’s offsets are persisted in the
cdc-connect-offsetstopic (RF=3), so another replica can resume from the exact point.
KafkaConnector — HTTP sink (Mailpit)
CDC events are also sent as email notifications to Mailpit:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: mailpit-http-sink
namespace: kafka-cdc
labels:
strimzi.io/cluster: cdc-connect
spec:
class: io.aiven.kafka.connect.http.HttpSinkConnector
tasksMax: 1
config:
topics: "cdc.public.customers,cdc.public.orders"
http.url: "http://n8n-mailpit.openshift-lightspeed.svc:8025/api/v1/send"
http.authorization.type: none
http.headers.content.type: "application/json"
Dead Letter Queue (DLQ)
Messages that fail processing are not lost — they are routed to DLQ topics for later analysis.
DLQ in KafkaConnectors
Both the source and sink connectors configure errors.deadletterqueue:
config:
errors.tolerance: all
errors.deadletterqueue.topic.name: dlq.cdc-errors
errors.deadletterqueue.topic.replication.factor: 3
errors.deadletterqueue.context.headers.enable: true
-
errors.tolerance: all— the connector keeps running even if a record fails -
errors.deadletterqueue.topic.name— topic where failed messages are sent -
errors.deadletterqueue.context.headers.enable— adds headers with the original error for diagnostics
DLQ in Apache Camel
The Camel route uses onException to catch errors and send them to a separate DLQ topic:
onException:
- exceptions:
- java.lang.Exception
handled:
constant: true
steps:
- to:
uri: kafka:dlq.cdc-camel-errors
parameters:
brokers: cdc-cluster-kafka-bootstrap.kafka-cdc.svc:9092
- log:
message: "Error sent to DLQ: ${exception.message}"
DLQ topics
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: dlq.cdc-errors
namespace: kafka-cdc
labels:
strimzi.io/cluster: cdc-cluster
spec:
partitions: 3
replicas: 3
config:
retention.ms: 2592000000
DLQ topics retain messages for 30 days (2592000000 ms) to allow analysis and reprocessing.
To inspect messages on the DLQ:
oc exec -it cdc-cluster-kafka-0 -n kafka-cdc -- bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic dlq.cdc-errors \
--from-beginning --max-messages 5
Testing CDC
Connect to the PostgreSQL database and run an INSERT:
oc exec -it deploy/cdc-postgresql -n kafka-cdc -- psql -U cdcuser -d cdcdb -c \
"INSERT INTO customers (first_name, last_name, email) VALUES ('Demo', 'User', 'demo@test.io');"
Verify the event reached the topic:
oc exec -it cdc-cluster-kafka-0 -n kafka-cdc -- bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic cdc.public.customers \
--from-beginning --max-messages 1
You can also view the email in Mailpit: https://n8n-mailpit-openshift-lightspeed.
Official Documentation
-
Red Hat build of Debezium — Deploying and configuring CDC connectors
-
Debezium Reference Documentation — Full upstream project documentation
-
Debezium PostgreSQL Connector — PostgreSQL-specific configuration (WAL, slots, publications)
-
KafkaConnect on OpenShift — Deploying KafkaConnect with Strimzi
-
PostgreSQL on RHEL 9 — Configuring PostgreSQL on Red Hat Enterprise Linux