CDC con Debezium
¿Qué es Debezium?
Debezium es una plataforma open source de Change Data Capture (CDC) distribuida. Captura cambios de bases de datos y los propaga como eventos a Apache Kafka.
Bases de datos soportadas: PostgreSQL, MySQL, MongoDB, SQL Server, Oracle, Db2, Cassandra.
PostgreSQL — Configuración para CDC
Para habilitar CDC, PostgreSQL debe estar configurado con wal_level=logical:
apiVersion: apps/v1
kind: Deployment
metadata:
name: cdc-postgresql
namespace: kafka-cdc
spec:
template:
spec:
containers:
- name: postgresql
image: registry.redhat.io/rhel9/postgresql-16:latest
args:
- "-c"
- "wal_level=logical"
- "-c"
- "max_replication_slots=4"
- "-c"
- "max_wal_senders=4"
Datos de ejemplo (seed)
CREATE TABLE customers (
id SERIAL PRIMARY KEY,
first_name VARCHAR(100) NOT NULL,
last_name VARCHAR(100) NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
customer_id INTEGER REFERENCES customers(id),
product VARCHAR(255) NOT NULL,
amount DECIMAL(10,2) NOT NULL,
status VARCHAR(50) DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE PUBLICATION cdc_publication FOR TABLE customers, orders;
KafkaConnect con Debezium
El connector de Debezium se despliega dentro de KafkaConnect usando la feature de build de Strimzi, que construye una imagen custom con los plugins necesarios:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: cdc-connect
namespace: kafka-cdc
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: "4.0.0"
replicas: 2
bootstrapServers: cdc-cluster-kafka-bootstrap:9093
build:
output:
type: docker
image: image-registry.openshift-image-registry.svc:5000/kafka-cdc/cdc-connect:latest
plugins:
- name: debezium-postgresql
artifacts:
- type: maven
group: io.debezium
artifact: debezium-connector-postgres
version: 2.7.3.Final
- name: http-sink
artifacts:
- type: maven
group: io.aiven
artifact: http-connector-for-apache-kafka
version: 0.7.0
La anotación strimzi.io/use-connector-resources: "true" permite gestionar los connectors como recursos KafkaConnector (GitOps-friendly).
|
KafkaConnector — Debezium Source
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: debezium-postgresql-source
namespace: kafka-cdc
labels:
strimzi.io/cluster: cdc-connect
spec:
class: io.debezium.connector.postgresql.PostgresConnector
tasksMax: 1
config:
database.hostname: cdc-postgresql
database.port: "5432"
database.user: cdcuser (1)
database.password: cdc-demo-2024 (1)
database.dbname: cdcdb
topic.prefix: cdc
schema.include.list: public
table.include.list: "public.customers,public.orders"
plugin.name: pgoutput
publication.name: cdc_publication
slot.name: debezium_cdc
snapshot.mode: initial
| 1 | En el chart Helm desplegado, las credenciales se inyectan desde values.yaml via templates: {{ .Values.postgresql.username }} y {{ .Values.postgresql.password }}. Los valores mostrados aquí son los defaults del demo. |
How it Works
Replicación Lógica de PostgreSQL
Debezium no hace queries SQL para capturar cambios — usa el protocolo de replicación lógica nativo de PostgreSQL:
-
PostgreSQL escribe cada transacción en el Write-Ahead Log (WAL) antes de confirmarla a disco. Con
wal_level=logical, el WAL incluye suficiente información para decodificar los cambios a nivel de fila. -
Debezium crea un replication slot (
debezium_cdc) que actúa como un cursor persistente en el WAL. PostgreSQL no elimina segmentos WAL que el slot aún no ha consumido — esto garantiza cero pérdida de eventos. -
El plugin
pgoutputdecodifica los cambios binarios del WAL en mensajes lógicos estructurados (tabla, columnas, valores before/after). -
La
PUBLICATION(cdc_publication) filtra qué tablas se replican — solocustomersyordersgeneran eventos.
Estructura del evento CDC
Cada evento Debezium tiene esta estructura:
{
"schema": { ... },
"payload": {
"before": null,
"after": { "id": 1, "first_name": "Demo", "last_name": "User", "email": "demo@test.io" },
"source": {
"version": "2.7.3.Final",
"connector": "postgresql",
"db": "cdcdb",
"schema": "public",
"table": "customers",
"lsn": 234881048,
"txId": 550
},
"op": "c",
"ts_ms": 1710000000000
}
}
-
op: tipo de operación —c(create/INSERT),u(update),d(delete),r(read/snapshot) -
before: estado previo de la fila (null en INSERT, populated en UPDATE/DELETE) -
after: estado nuevo de la fila (null en DELETE) -
source.lsn: Log Sequence Number — posición exacta en el WAL de PostgreSQL -
source.txId: ID de la transacción — permite correlacionar múltiples cambios de la misma TX
KafkaConnect Task Distribution
KafkaConnect distribuye el trabajo mediante tasks:
-
Cada connector declara cuántas tasks necesita (
tasksMax). El source connector de Debezium siempre usa 1 task (limitación de PostgreSQL: un solo replication slot por connector). -
El HTTP sink puede usar múltiples tasks — una por partición del topic de entrada.
-
Si una réplica de Connect cae, el group coordinator reasigna las tasks a las réplicas restantes en ~30 segundos.
-
Los offsets de cada task se persisten en el topic
cdc-connect-offsets(RF=3), permitiendo que otra réplica retome el trabajo desde el punto exacto.
KafkaConnector — HTTP Sink (Mailpit)
Los eventos CDC también se envían como notificaciones por email a Mailpit:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: mailpit-http-sink
namespace: kafka-cdc
labels:
strimzi.io/cluster: cdc-connect
spec:
class: io.aiven.kafka.connect.http.HttpSinkConnector
tasksMax: 1
config:
topics: "cdc.public.customers,cdc.public.orders"
http.url: "http://n8n-mailpit.openshift-lightspeed.svc:8025/api/v1/send"
http.authorization.type: none
http.headers.content.type: "application/json"
Dead Letter Queue (DLQ)
Los mensajes que fallan en el procesamiento no se pierden — se redirigen a topics de DLQ para análisis posterior.
DLQ en KafkaConnectors
Tanto el source como el sink connector tienen configurado errors.deadletterqueue:
config:
errors.tolerance: all
errors.deadletterqueue.topic.name: dlq.cdc-errors
errors.deadletterqueue.topic.replication.factor: 3
errors.deadletterqueue.context.headers.enable: true
-
errors.tolerance: all— el connector continúa operando aunque un registro falle -
errors.deadletterqueue.topic.name— topic donde se envían los mensajes fallidos -
errors.deadletterqueue.context.headers.enable— agrega headers con el error original para diagnóstico
DLQ en Apache Camel
La ruta Camel usa onException para capturar errores y enviarlos a un topic DLQ separado:
onException:
- exceptions:
- java.lang.Exception
handled:
constant: true
steps:
- to:
uri: kafka:dlq.cdc-camel-errors
parameters:
brokers: cdc-cluster-kafka-bootstrap.kafka-cdc.svc:9092
- log:
message: "Error sent to DLQ: ${exception.message}"
Topics DLQ
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: dlq.cdc-errors
namespace: kafka-cdc
labels:
strimzi.io/cluster: cdc-cluster
spec:
partitions: 3
replicas: 3
config:
retention.ms: 2592000000
Los topics DLQ tienen retención de 30 días (2592000000 ms) para permitir análisis y reprocesamiento.
Para inspeccionar mensajes en la DLQ:
oc exec -it cdc-cluster-kafka-0 -n kafka-cdc -- bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic dlq.cdc-errors \
--from-beginning --max-messages 5
Probando el CDC
Conectate a la base PostgreSQL y ejecuta un INSERT:
oc exec -it deploy/cdc-postgresql -n kafka-cdc -- psql -U cdcuser -d cdcdb -c \
"INSERT INTO customers (first_name, last_name, email) VALUES ('Demo', 'User', 'demo@test.io');"
Verifica que el evento llegó al topic:
oc exec -it cdc-cluster-kafka-0 -n kafka-cdc -- bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic cdc.public.customers \
--from-beginning --max-messages 1
También podes ver el email en Mailpit: https://n8n-mailpit-openshift-lightspeed.apps.cluster-l9nhj.dynamic.redhatworkshops.io
Documentación Oficial
-
Red Hat build of Debezium — Guía de despliegue y configuración de conectores CDC
-
Debezium Reference Documentation — Documentación completa del proyecto upstream
-
Debezium PostgreSQL Connector — Configuración específica para PostgreSQL (WAL, slots, publicaciones)
-
KafkaConnect on OpenShift — Despliegue de KafkaConnect con Strimzi
-
PostgreSQL on RHEL 9 — Configuración de PostgreSQL en Red Hat Enterprise Linux