Verificación Operativa del Cluster

Esta página documenta el estado operativo del pipeline CDC completo desplegado en el cluster OpenShift, incluyendo capturas de pantalla de cada componente y los bloques YAML que definen la configuración.

Estado General del Namespace kafka-cdc

Todos los componentes del pipeline CDC están operativos en el namespace kafka-cdc:

Componente Pods Estado Función

Kafka Cluster (cdc-cluster)

3 brokers

Ready (KRaft)

Cluster Kafka en modo KRaft con 3 brokers/controllers

KafkaConnect (cdc-connect)

2 replicas

Ready

Framework de conectores con Debezium y HTTP Sink

PostgreSQL (cdc-postgresql)

1

Running

Base de datos fuente con WAL level logical

Kafka Bridge (cdc-bridge)

1

Ready

API REST HTTP para producir/consumir mensajes

Apicurio Registry

1

Running

Schema Registry para gobierno de esquemas

Kafka Console

2 (UI + API)

Running

Consola web para monitoreo de topics y mensajes

Kafka Exporter

1

Running

Exporta métricas Kafka a Prometheus

Entity Operator

2 containers

Running

Gestiona topics y usuarios vía CRDs

Kafka Cluster — KRaft Mode

El cluster Kafka opera en modo KRaft (sin ZooKeeper) con 3 nodos que cumplen roles de broker y controller simultáneamente.

Definición del KafkaNodePool
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: broker
  namespace: kafka-cdc
  labels:
    strimzi.io/cluster: cdc-cluster
spec:
  replicas: 3
  roles:
    - broker
    - controller
  storage:
    type: persistent-claim
    size: 10Gi
    deleteClaim: false
Definición del Kafka CR con KRaft habilitado
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: cdc-cluster
  namespace: kafka-cdc
  annotations:
    strimzi.io/node-pools: enabled
    strimzi.io/kraft: enabled
spec:
  kafka:
    version: "4.0.0"
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: scram-sha-512
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2

Streams for Apache Kafka Console

La consola de Streams permite visualizar el cluster, topics, consumer groups y mensajes individuales con su payload CDC.

Kafka Console - Cluster Overview

El overview del cluster muestra: 3/3 brokers online, Kafka 4.0.0, 8 topics, 44 particiones, y 2 consumer groups activos.

Kafka Console - CDC Messages

Los mensajes en el topic cdc.public.customers muestran payloads de Debezium con schema completo y datos after de cada registro.

Definición del Console CR
apiVersion: console.streamshub.github.com/v1alpha1
kind: Console
metadata:
  name: kafka-console
  namespace: kafka-cdc
spec:
  hostname: kafka-console-kafka-cdc.apps.cluster-l9nhj.dynamic.redhatworkshops.io
  kafkaClusters:
    - name: cdc-cluster
      namespace: kafka-cdc
      listener: plain

KafkaConnect con Debezium y HTTP Sink

KafkaConnect ejecuta 2 replicas con plugins de Debezium PostgreSQL 3.0.2 y Aiven HTTP Sink 0.9.0, ambos compilados como imagen custom via OpenShift Build.

Definición del KafkaConnect con build de plugins
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: cdc-connect
  namespace: kafka-cdc
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: "4.0.0"
  replicas: 2
  bootstrapServers: cdc-cluster-kafka-bootstrap:9093
  authentication:
    type: scram-sha-512
    username: cdc-user
    passwordSecret:
      secretName: cdc-user
      password: password
  tls:
    trustedCertificates:
      - secretName: cdc-cluster-cluster-ca-cert
        certificate: ca.crt
  build:
    output:
      type: docker
      image: image-registry.openshift-image-registry.svc:5000/kafka-cdc/cdc-connect:latest
    plugins:
      - name: debezium-postgresql
        artifacts:
          - type: zip
            url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/3.0.2.Final/debezium-connector-postgres-3.0.2.Final-plugin.zip
      - name: http-sink
        artifacts:
          - type: zip
            url: https://github.com/Aiven-Open/http-connector-for-apache-kafka/releases/download/v0.9.0/http-connector-for-apache-kafka-0.9.0.zip

Debezium PostgreSQL Source Connector

Captura cambios en las tablas customers y orders usando logical replication con pgoutput.

Definición del KafkaConnector Debezium
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: debezium-postgresql-source
  namespace: kafka-cdc
  labels:
    strimzi.io/cluster: cdc-connect
spec:
  class: io.debezium.connector.postgresql.PostgresConnector
  tasksMax: 1
  config:
    database.hostname: cdc-postgresql
    database.port: "5432"
    database.dbname: cdcdb
    topic.prefix: cdc
    schema.include.list: public
    table.include.list: "public.customers,public.orders"
    plugin.name: pgoutput
    publication.name: cdc_publication
    slot.name: debezium_cdc
    snapshot.mode: initial
    errors.tolerance: all
    errors.deadletterqueue.topic.name: dlq.cdc-errors
    errors.deadletterqueue.topic.replication.factor: 3
    errors.deadletterqueue.context.headers.enable: true

HTTP Sink Connector — Mailpit Notifications

Reenvía los eventos CDC a Mailpit para notificaciones por email.

Mailpit - CDC Notifications
Definición del KafkaConnector HTTP Sink
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: mailpit-http-sink
  namespace: kafka-cdc
  labels:
    strimzi.io/cluster: cdc-connect
spec:
  class: io.aiven.kafka.connect.http.HttpSinkConnector
  tasksMax: 1
  config:
    topics: "cdc.public.customers,cdc.public.orders"
    http.url: "http://n8n-mailpit.openshift-lightspeed.svc:8025/api/v1/send"
    http.authorization.type: none
    http.headers.content.type: "application/json"
    batching.enabled: "false"
    errors.tolerance: all
    errors.deadletterqueue.topic.name: dlq.cdc-errors
    errors.deadletterqueue.topic.replication.factor: 3
    errors.deadletterqueue.context.headers.enable: true

Apicurio Registry

Apicurio Registry

Red Hat build of Apicurio Registry desplegado para gobierno de esquemas (Avro, Protobuf, JSON Schema).

KafkaUser — Autenticación SCRAM-SHA-512

Definición del KafkaUser
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: cdc-user
  namespace: kafka-cdc
  labels:
    strimzi.io/cluster: cdc-cluster
spec:
  authentication:
    type: scram-sha-512

Topics Kafka

Topic Particiones Factor de Réplica Propósito

cdc.public.customers

3

3

Eventos CDC de la tabla customers

cdc.public.orders

3

3

Eventos CDC de la tabla orders

dlq.cdc-errors

3

3

Dead Letter Queue para errores de conectores

dlq.cdc-camel-errors

3

3

Dead Letter Queue para errores de Camel

Definición de un KafkaTopic
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: cdc.public.customers
  namespace: kafka-cdc
  labels:
    strimzi.io/cluster: cdc-cluster
spec:
  partitions: 3
  replicas: 3
  config:
    retention.ms: 604800000
    cleanup.policy: compact

PostgreSQL con WAL Level Logical

La base de datos fuente utiliza PostgreSQL 16 con WAL level logical configurado via ConfigMap:

ConfigMap de configuración PostgreSQL
apiVersion: v1
kind: ConfigMap
metadata:
  name: cdc-postgresql-cfg
  namespace: kafka-cdc
data:
  custom.conf: |
    wal_level = logical
    max_replication_slots = 4
    max_wal_senders = 4

El volumen se monta en /opt/app-root/src/postgresql-cfg para que la imagen Red Hat PostgreSQL 16 lo aplique automáticamente.

NetworkPolicy

La NetworkPolicy restringe el tráfico de ingreso al namespace, permitiendo solo comunicación interna y probes de health check:

NetworkPolicy del namespace kafka-cdc
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: kafka-cdc-allow-internal
  namespace: kafka-cdc
spec:
  podSelector: {}
  policyTypes:
    - Ingress
  ingress:
    - from:
        - namespaceSelector:
            matchLabels:
              kubernetes.io/metadata.name: kafka-cdc
    - from:
        - namespaceSelector:
            matchLabels:
              kubernetes.io/metadata.name: openshift-cluster-observability-operator
      ports:
        - protocol: TCP
          port: 9404
    - ports:
        - protocol: TCP
          port: 3000
        - protocol: TCP
          port: 8080
        - protocol: TCP
          port: 8083
        - protocol: TCP
          port: 9090
        - protocol: TCP
          port: 9092
        - protocol: TCP
          port: 9093
        - protocol: TCP
          port: 5432

Verificación End-to-End

Para validar el pipeline completo, insertar un registro en PostgreSQL:

INSERT INTO customers (first_name, last_name, email)
VALUES ('Test', 'User', 'test@neuralbank.io');

El flujo completo es:

  1. PostgreSQL → Debezium captura el WAL change

  2. Debezium → Publica al topic cdc.public.customers

  3. Kafka → Almacena con 3 replicas y compaction

  4. HTTP Sink → Envía el evento a Mailpit

  5. Mailpit → Muestra la notificación en el inbox

Documentación Oficial