Kaoto + Apache Camel

¿Qué es Apache Camel?

Apache Camel es un framework de integración open source que implementa los Enterprise Integration Patterns (EIP). Permite conectar sistemas diversos usando una amplia biblioteca de componentes.

Red Hat build of Apache Camel for Quarkus es la distribución soportada para OpenShift — cloud-native, arranque rápido, nativa GraalVM.

Red Hat Camel K (serverless) alcanzó End of Life en junio 2025. La ruta de migración es Camel for Quarkus, que ofrece el mismo modelo de rutas pero con deployment tradicional.

¿Qué es Kaoto?

Kaoto es un diseñador visual low-code para rutas Apache Camel. Permite crear y editar integraciones arrastrando componentes — sin necesidad de escribir YAML/XML manualmente.

Kaoto está disponible como extensión VS Code (redhat.vscode-kaoto) y viene pre-instalado en los workspaces de DevSpaces de este cluster.

Ruta Camel del CDC Pipeline

La ruta de integración consume eventos CDC de Kafka y envía notificaciones por email a Mailpit:

- route:
    id: cdc-customer-notification
    from:
      uri: kafka:cdc.public.customers
      parameters:
        brokers: cdc-cluster-kafka-bootstrap.kafka-cdc.svc:9092
        groupId: camel-cdc-consumer
        autoOffsetReset: latest
    steps:
      - log:
          message: "CDC event received: ${body}"
      - unmarshal:
          json:
            library: jackson
      - choice:
          when:
            - jsonpath:
                expression: "$.payload.op"
              simple: "${body[payload][op]} == 'c'"
              steps:
                - setHeader:
                    name: subject
                    constant: "New Customer Created"
                - setBody:
                    simple: |
                      {"from":{"name":"CDC Pipeline","email":"cdc@neuralbank.io"},
                       "to":[{"name":"Admin","email":"admin@neuralbank.io"}],
                       "subject":"New Customer Created",
                       "text":"Customer created in the database."}
          otherwise:
            steps:
              - setBody:
                  simple: |
                    {"from":{"name":"CDC Pipeline","email":"cdc@neuralbank.io"},
                     "to":[{"name":"Admin","email":"admin@neuralbank.io"}],
                     "subject":"Customer Record Changed",
                     "text":"A change was detected on the customers table."}
      - marshal:
          json:
            library: jackson
      - to:
          uri: http://n8n-mailpit.openshift-lightspeed.svc:8025/api/v1/send

Usando Kaoto en DevSpaces

  1. Abrí DevSpaces: https://devspaces.apps.cluster-l9nhj.dynamic.redhatworkshops.io

  2. Crea un workspace nuevo o abrí uno existente

  3. La extensión Kaoto está pre-instalada automáticamente

  4. Abrí cualquier archivo .camel.yaml — Kaoto muestra la vista visual

  5. Podes arrastrar componentes, modificar parámetros y guardar — el YAML se actualiza automáticamente

Componentes Camel usados en este pipeline

Componente Uso

camel-kafka

Consume mensajes del topic CDC

camel-jackson

Marshal/Unmarshal JSON

camel-http

HTTP POST a Mailpit API

camel-jsonpath

Evaluación de expresiones JSONPath para content-based routing

camel-micrometer

Exporta métricas Prometheus del procesamiento

How it Works

Procesamiento de un evento CDC en Camel

Cuando un nuevo evento CDC llega al topic cdc.public.customers, la ruta Camel ejecuta los siguientes pasos internos:

  1. Poll — El componente camel-kafka hace poll() al broker líder de las particiones asignadas al consumer group camel-cdc-consumer. Kafka devuelve un batch de registros. Cada registro se procesa individualmente como un Exchange de Camel.

  2. Unmarshal — El body del Exchange (JSON string del evento Debezium) se deserializa a un Map<String,Object> usando Jackson. Esto permite acceder a campos como payload.op, payload.after.first_name, etc.

  3. Content-Based Routing — El choice() evalúa $.payload.op mediante JSONPath:

    • Si op == 'c' (INSERT): genera un email de "New Customer Created"

    • Para cualquier otra operación (UPDATE, DELETE): genera un email genérico de cambio

  4. Marshal + HTTP POST — El body transformado se serializa de vuelta a JSON y se envía via HTTP POST al API de Mailpit (/api/v1/send). El componente camel-http gestiona connection pooling, retries y timeouts.

  5. Error Handling — Si cualquier paso falla, el onException captura la excepción:

    • Marca el error como handled (no propaga el fallo upstream)

    • Envía el mensaje original al topic DLQ dlq.cdc-camel-errors

    • Logea el error para diagnóstico

    • El offset de Kafka se commitea — el mensaje no se reprocesa

Camel Exchange Model

Cada mensaje procesado por Camel es un Exchange con:

  • Body: el contenido del mensaje (transformado en cada step)

  • Headers: metadata (topic, partition, offset, timestamp de Kafka, headers HTTP para el POST)

  • Properties: estado interno del Exchange (no visible para componentes externos)

El Exchange fluye por la ruta como un pipeline — la salida de cada step es el input del siguiente. Esto permite encadenar transformaciones sin estado compartido.

Métricas del procesador Camel

El procesador Camel expone métricas Prometheus en /q/metrics:

curl -s http://camel-cdc-processor.kafka-cdc.svc:8080/q/metrics | grep camel

Estas métricas son recogidas por el PodMonitor y visualizadas en el dashboard de Grafana "Kafka CDC Pipeline".

Documentación Oficial