CDC with Debezium

What is Debezium?

Debezium is a distributed open source Change Data Capture (CDC) platform. It captures database changes and propagates them as events to Apache Kafka.

Supported databases: PostgreSQL, MySQL, MongoDB, SQL Server, Oracle, Db2, Cassandra.

PostgreSQL — CDC configuration

To enable CDC, PostgreSQL must be configured with wal_level=logical:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: cdc-postgresql
  namespace: kafka-cdc
spec:
  template:
    spec:
      containers:
        - name: postgresql
          image: registry.redhat.io/rhel9/postgresql-16:latest
          args:
            - "-c"
            - "wal_level=logical"
            - "-c"
            - "max_replication_slots=4"
            - "-c"
            - "max_wal_senders=4"

Sample data (seed)

CREATE TABLE customers (
  id SERIAL PRIMARY KEY,
  first_name VARCHAR(100) NOT NULL,
  last_name VARCHAR(100) NOT NULL,
  email VARCHAR(255) UNIQUE NOT NULL,
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE orders (
  id SERIAL PRIMARY KEY,
  customer_id INTEGER REFERENCES customers(id),
  product VARCHAR(255) NOT NULL,
  amount DECIMAL(10,2) NOT NULL,
  status VARCHAR(50) DEFAULT 'pending',
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE PUBLICATION cdc_publication FOR TABLE customers, orders;

KafkaConnect with Debezium

The Debezium connector is deployed inside KafkaConnect using Strimzi’s build feature, which builds a custom image with the required plugins:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: cdc-connect
  namespace: kafka-cdc
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: "4.0.0"
  replicas: 2
  bootstrapServers: cdc-cluster-kafka-bootstrap:9093
  build:
    output:
      type: docker
      image: image-registry.openshift-image-registry.svc:5000/kafka-cdc/cdc-connect:latest
    plugins:
      - name: debezium-postgresql
        artifacts:
          - type: maven
            group: io.debezium
            artifact: debezium-connector-postgres
            version: 2.7.3.Final
      - name: http-sink
        artifacts:
          - type: maven
            group: io.aiven
            artifact: http-connector-for-apache-kafka
            version: 0.7.0
The strimzi.io/use-connector-resources: "true" annotation lets you manage connectors as KafkaConnector resources (GitOps-friendly).

KafkaConnector — Debezium source

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: debezium-postgresql-source
  namespace: kafka-cdc
  labels:
    strimzi.io/cluster: cdc-connect
spec:
  class: io.debezium.connector.postgresql.PostgresConnector
  tasksMax: 1
  config:
    database.hostname: cdc-postgresql
    database.port: "5432"
    database.user: cdcuser                          (1)
    database.password: cdc-demo-2024                 (1)
    database.dbname: cdcdb
    topic.prefix: cdc
    schema.include.list: public
    table.include.list: "public.customers,public.orders"
    plugin.name: pgoutput
    publication.name: cdc_publication
    slot.name: debezium_cdc
    snapshot.mode: initial
1 In the deployed Helm chart, credentials are injected from values.yaml via templates: {{ .Values.postgresql.username }} and {{ .Values.postgresql.password }}. The values shown here are the demo defaults.

How it Works

PostgreSQL logical replication

Debezium does not run SQL queries to capture changes — it uses PostgreSQL’s native logical replication protocol:

  1. PostgreSQL writes every transaction to the Write-Ahead Log (WAL) before committing to disk. With wal_level=logical, the WAL contains enough information to decode row-level changes.

  2. Debezium creates a replication slot (debezium_cdc) that acts as a persistent cursor in the WAL. PostgreSQL does not remove WAL segments the slot has not consumed yet — this guarantees zero event loss.

  3. The pgoutput plugin decodes binary WAL changes into structured logical messages (table, columns, before/after values).

  4. The PUBLICATION (cdc_publication) filters which tables are replicated — only customers and orders generate events.

CDC event structure

Each Debezium event has this structure:

{
  "schema": { ... },
  "payload": {
    "before": null,
    "after": { "id": 1, "first_name": "Demo", "last_name": "User", "email": "demo@test.io" },
    "source": {
      "version": "2.7.3.Final",
      "connector": "postgresql",
      "db": "cdcdb",
      "schema": "public",
      "table": "customers",
      "lsn": 234881048,
      "txId": 550
    },
    "op": "c",
    "ts_ms": 1710000000000
  }
}
  • op: operation type — c (create/INSERT), u (update), d (delete), r (read/snapshot)

  • before: previous row state (null on INSERT, populated on UPDATE/DELETE)

  • after: new row state (null on DELETE)

  • source.lsn: Log Sequence Number — exact position in the PostgreSQL WAL

  • source.txId: transaction ID — correlates multiple changes from the same TX

KafkaConnect task distribution

KafkaConnect distributes work via tasks:

  1. Each connector declares how many tasks it needs (tasksMax). The Debezium source connector always uses 1 task (PostgreSQL limitation: one replication slot per connector).

  2. The HTTP sink can use multiple tasks — one per input topic partition.

  3. If a Connect replica fails, the group coordinator reassigns tasks to the remaining replicas in ~30 seconds.

  4. Each task’s offsets are persisted in the cdc-connect-offsets topic (RF=3), so another replica can resume from the exact point.

KafkaConnector — HTTP sink (Mailpit)

CDC events are also sent as email notifications to Mailpit:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: mailpit-http-sink
  namespace: kafka-cdc
  labels:
    strimzi.io/cluster: cdc-connect
spec:
  class: io.aiven.kafka.connect.http.HttpSinkConnector
  tasksMax: 1
  config:
    topics: "cdc.public.customers,cdc.public.orders"
    http.url: "http://n8n-mailpit.openshift-lightspeed.svc:8025/api/v1/send"
    http.authorization.type: none
    http.headers.content.type: "application/json"

Dead Letter Queue (DLQ)

Messages that fail processing are not lost — they are routed to DLQ topics for later analysis.

DLQ in KafkaConnectors

Both the source and sink connectors configure errors.deadletterqueue:

config:
  errors.tolerance: all
  errors.deadletterqueue.topic.name: dlq.cdc-errors
  errors.deadletterqueue.topic.replication.factor: 3
  errors.deadletterqueue.context.headers.enable: true
  • errors.tolerance: all — the connector keeps running even if a record fails

  • errors.deadletterqueue.topic.name — topic where failed messages are sent

  • errors.deadletterqueue.context.headers.enable — adds headers with the original error for diagnostics

DLQ in Apache Camel

The Camel route uses onException to catch errors and send them to a separate DLQ topic:

onException:
  - exceptions:
      - java.lang.Exception
    handled:
      constant: true
    steps:
      - to:
          uri: kafka:dlq.cdc-camel-errors
          parameters:
            brokers: cdc-cluster-kafka-bootstrap.kafka-cdc.svc:9092
      - log:
          message: "Error sent to DLQ: ${exception.message}"

DLQ topics

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: dlq.cdc-errors
  namespace: kafka-cdc
  labels:
    strimzi.io/cluster: cdc-cluster
spec:
  partitions: 3
  replicas: 3
  config:
    retention.ms: 2592000000

DLQ topics retain messages for 30 days (2592000000 ms) to allow analysis and reprocessing.

To inspect messages on the DLQ:

oc exec -it cdc-cluster-kafka-0 -n kafka-cdc -- bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic dlq.cdc-errors \
  --from-beginning --max-messages 5

Testing CDC

Connect to the PostgreSQL database and run an INSERT:

oc exec -it deploy/cdc-postgresql -n kafka-cdc -- psql -U cdcuser -d cdcdb -c \
  "INSERT INTO customers (first_name, last_name, email) VALUES ('Demo', 'User', 'demo@test.io');"

Verify the event reached the topic:

oc exec -it cdc-cluster-kafka-0 -n kafka-cdc -- bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic cdc.public.customers \
  --from-beginning --max-messages 1

You can also view the email in Mailpit: https://n8n-mailpit-openshift-lightspeed.

Official Documentation