CDC con Debezium

¿Qué es Debezium?

Debezium es una plataforma open source de Change Data Capture (CDC) distribuida. Captura cambios de bases de datos y los propaga como eventos a Apache Kafka.

Bases de datos soportadas: PostgreSQL, MySQL, MongoDB, SQL Server, Oracle, Db2, Cassandra.

PostgreSQL — Configuración para CDC

Para habilitar CDC, PostgreSQL debe estar configurado con wal_level=logical:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: cdc-postgresql
  namespace: kafka-cdc
spec:
  template:
    spec:
      containers:
        - name: postgresql
          image: registry.redhat.io/rhel9/postgresql-16:latest
          args:
            - "-c"
            - "wal_level=logical"
            - "-c"
            - "max_replication_slots=4"
            - "-c"
            - "max_wal_senders=4"

Datos de ejemplo (seed)

CREATE TABLE customers (
  id SERIAL PRIMARY KEY,
  first_name VARCHAR(100) NOT NULL,
  last_name VARCHAR(100) NOT NULL,
  email VARCHAR(255) UNIQUE NOT NULL,
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE orders (
  id SERIAL PRIMARY KEY,
  customer_id INTEGER REFERENCES customers(id),
  product VARCHAR(255) NOT NULL,
  amount DECIMAL(10,2) NOT NULL,
  status VARCHAR(50) DEFAULT 'pending',
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE PUBLICATION cdc_publication FOR TABLE customers, orders;

KafkaConnect con Debezium

El connector de Debezium se despliega dentro de KafkaConnect usando la feature de build de Strimzi, que construye una imagen custom con los plugins necesarios:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: cdc-connect
  namespace: kafka-cdc
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: "4.0.0"
  replicas: 2
  bootstrapServers: cdc-cluster-kafka-bootstrap:9093
  build:
    output:
      type: docker
      image: image-registry.openshift-image-registry.svc:5000/kafka-cdc/cdc-connect:latest
    plugins:
      - name: debezium-postgresql
        artifacts:
          - type: maven
            group: io.debezium
            artifact: debezium-connector-postgres
            version: 2.7.3.Final
      - name: http-sink
        artifacts:
          - type: maven
            group: io.aiven
            artifact: http-connector-for-apache-kafka
            version: 0.7.0
La anotación strimzi.io/use-connector-resources: "true" permite gestionar los connectors como recursos KafkaConnector (GitOps-friendly).

KafkaConnector — Debezium Source

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: debezium-postgresql-source
  namespace: kafka-cdc
  labels:
    strimzi.io/cluster: cdc-connect
spec:
  class: io.debezium.connector.postgresql.PostgresConnector
  tasksMax: 1
  config:
    database.hostname: cdc-postgresql
    database.port: "5432"
    database.user: cdcuser                          (1)
    database.password: cdc-demo-2024                 (1)
    database.dbname: cdcdb
    topic.prefix: cdc
    schema.include.list: public
    table.include.list: "public.customers,public.orders"
    plugin.name: pgoutput
    publication.name: cdc_publication
    slot.name: debezium_cdc
    snapshot.mode: initial
1 En el chart Helm desplegado, las credenciales se inyectan desde values.yaml via templates: {{ .Values.postgresql.username }} y {{ .Values.postgresql.password }}. Los valores mostrados aquí son los defaults del demo.

How it Works

Replicación Lógica de PostgreSQL

Debezium no hace queries SQL para capturar cambios — usa el protocolo de replicación lógica nativo de PostgreSQL:

  1. PostgreSQL escribe cada transacción en el Write-Ahead Log (WAL) antes de confirmarla a disco. Con wal_level=logical, el WAL incluye suficiente información para decodificar los cambios a nivel de fila.

  2. Debezium crea un replication slot (debezium_cdc) que actúa como un cursor persistente en el WAL. PostgreSQL no elimina segmentos WAL que el slot aún no ha consumido — esto garantiza cero pérdida de eventos.

  3. El plugin pgoutput decodifica los cambios binarios del WAL en mensajes lógicos estructurados (tabla, columnas, valores before/after).

  4. La PUBLICATION (cdc_publication) filtra qué tablas se replican — solo customers y orders generan eventos.

Estructura del evento CDC

Cada evento Debezium tiene esta estructura:

{
  "schema": { ... },
  "payload": {
    "before": null,
    "after": { "id": 1, "first_name": "Demo", "last_name": "User", "email": "demo@test.io" },
    "source": {
      "version": "2.7.3.Final",
      "connector": "postgresql",
      "db": "cdcdb",
      "schema": "public",
      "table": "customers",
      "lsn": 234881048,
      "txId": 550
    },
    "op": "c",
    "ts_ms": 1710000000000
  }
}
  • op: tipo de operación — c (create/INSERT), u (update), d (delete), r (read/snapshot)

  • before: estado previo de la fila (null en INSERT, populated en UPDATE/DELETE)

  • after: estado nuevo de la fila (null en DELETE)

  • source.lsn: Log Sequence Number — posición exacta en el WAL de PostgreSQL

  • source.txId: ID de la transacción — permite correlacionar múltiples cambios de la misma TX

KafkaConnect Task Distribution

KafkaConnect distribuye el trabajo mediante tasks:

  1. Cada connector declara cuántas tasks necesita (tasksMax). El source connector de Debezium siempre usa 1 task (limitación de PostgreSQL: un solo replication slot por connector).

  2. El HTTP sink puede usar múltiples tasks — una por partición del topic de entrada.

  3. Si una réplica de Connect cae, el group coordinator reasigna las tasks a las réplicas restantes en ~30 segundos.

  4. Los offsets de cada task se persisten en el topic cdc-connect-offsets (RF=3), permitiendo que otra réplica retome el trabajo desde el punto exacto.

KafkaConnector — HTTP Sink (Mailpit)

Los eventos CDC también se envían como notificaciones por email a Mailpit:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: mailpit-http-sink
  namespace: kafka-cdc
  labels:
    strimzi.io/cluster: cdc-connect
spec:
  class: io.aiven.kafka.connect.http.HttpSinkConnector
  tasksMax: 1
  config:
    topics: "cdc.public.customers,cdc.public.orders"
    http.url: "http://n8n-mailpit.openshift-lightspeed.svc:8025/api/v1/send"
    http.authorization.type: none
    http.headers.content.type: "application/json"

Dead Letter Queue (DLQ)

Los mensajes que fallan en el procesamiento no se pierden — se redirigen a topics de DLQ para análisis posterior.

DLQ en KafkaConnectors

Tanto el source como el sink connector tienen configurado errors.deadletterqueue:

config:
  errors.tolerance: all
  errors.deadletterqueue.topic.name: dlq.cdc-errors
  errors.deadletterqueue.topic.replication.factor: 3
  errors.deadletterqueue.context.headers.enable: true
  • errors.tolerance: all — el connector continúa operando aunque un registro falle

  • errors.deadletterqueue.topic.name — topic donde se envían los mensajes fallidos

  • errors.deadletterqueue.context.headers.enable — agrega headers con el error original para diagnóstico

DLQ en Apache Camel

La ruta Camel usa onException para capturar errores y enviarlos a un topic DLQ separado:

onException:
  - exceptions:
      - java.lang.Exception
    handled:
      constant: true
    steps:
      - to:
          uri: kafka:dlq.cdc-camel-errors
          parameters:
            brokers: cdc-cluster-kafka-bootstrap.kafka-cdc.svc:9092
      - log:
          message: "Error sent to DLQ: ${exception.message}"

Topics DLQ

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: dlq.cdc-errors
  namespace: kafka-cdc
  labels:
    strimzi.io/cluster: cdc-cluster
spec:
  partitions: 3
  replicas: 3
  config:
    retention.ms: 2592000000

Los topics DLQ tienen retención de 30 días (2592000000 ms) para permitir análisis y reprocesamiento.

Para inspeccionar mensajes en la DLQ:

oc exec -it cdc-cluster-kafka-0 -n kafka-cdc -- bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic dlq.cdc-errors \
  --from-beginning --max-messages 5

Probando el CDC

Conectate a la base PostgreSQL y ejecuta un INSERT:

oc exec -it deploy/cdc-postgresql -n kafka-cdc -- psql -U cdcuser -d cdcdb -c \
  "INSERT INTO customers (first_name, last_name, email) VALUES ('Demo', 'User', 'demo@test.io');"

Verifica que el evento llegó al topic:

oc exec -it cdc-cluster-kafka-0 -n kafka-cdc -- bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic cdc.public.customers \
  --from-beginning --max-messages 1

Documentación Oficial