BPM con Kogito / SonataFlow

Introducción

Kogito es la tecnología de automatización de procesos de negocio (BPM) cloud-native de Red Hat, basada en los proyectos Drools y jBPM. En OpenShift, la evolución moderna de Kogito es SonataFlow — un motor de workflows serverless que ejecuta procesos definidos en el estándar Serverless Workflow Specification.

El operador OpenShift Serverless Logic (logic-operator-rhel8) gestiona el ciclo de vida completo de los workflows SonataFlow en OpenShift.

¿Por qué BPM en el Pipeline CDC?

Sin BPM Con BPM (SonataFlow)

Los eventos CDC se procesan linealmente

Los eventos CDC disparan procesos de negocio con lógica condicional

No hay aprobaciones ni SLAs

Aprobaciones con timeout (24h SLA para clientes premium)

Notificaciones simples (fire-and-forget)

Orquestación paralela: provisionar + notificar simultáneamente

Sin clasificación de clientes

Clasificación automática: basic / standard / premium

Sin métricas de proceso

Métricas de onboarding emitidas a Kafka

Arquitectura

Diagram

Operador: OpenShift Serverless Logic

El operador se instala desde Red Hat Operators:

Subscription del operador
apiVersion: operators.coreos.com/v1alpha1
kind: Subscription
metadata:
  name: logic-operator-rhel8
  namespace: openshift-serverless-logic
spec:
  channel: alpha
  installPlanApproval: Automatic
  name: logic-operator-rhel8
  source: redhat-operators
  sourceNamespace: openshift-marketplace

El operador proporciona los siguientes CRDs:

  • SonataFlow — Define un workflow serverless

  • SonataFlowPlatform — Configura la plataforma (build, runtime)

  • SonataFlowBuild — Gestiona builds de workflows

  • SonataFlowClusterPlatform — Configuración a nivel cluster

SonataFlow Platform

La plataforma se configura en el namespace kogito-bpm:

apiVersion: sonataflow.org/v1alpha08
kind: SonataFlowPlatform
metadata:
  name: sonataflow-platform
  namespace: kogito-bpm
spec:
  build:
    config:
      strategyOptions:
        KanikoBuildCacheEnabled: "true"
  devMode:
    baseImage: registry.redhat.io/openshift-serverless-1/logic-swf-devmode-rhel8:1.36

Workflow: Customer Onboarding

El proceso de onboarding de clientes se define como un SonataFlow CR usando la especificación Serverless Workflow:

Diagrama del Proceso

┌──────────────────┐
│  CDC Event        │
│  (Kafka Topic)    │
└────────┬─────────┘
         │
┌────────▼─────────┐
│ Validate Customer │
│ Data              │
└────────┬─────────┘
         │
┌────────▼─────────┐
│ Classify Customer │
│ (basic/standard/  │
│  premium)         │
└────────┬─────────┘
         │
    ┌────┴────┐
    │ Premium? │
    └────┬────┘
   Yes   │   No
┌────────▼──┐  ┌──▼──────────┐
│ Manager    │  │ Auto-       │
│ Approval   │  │ Approved    │
│ (24h SLA)  │  │             │
└────────┬──┘  └──┬──────────┘
         │        │
┌────────▼────────▼┐
│ Provision Account │
│ (parallel)        │
│ ├─ Create Account │
│ └─ Send Welcome   │
└────────┬─────────┘
         │
┌────────▼─────────┐
│ Emit Onboarding   │
│ Metric            │
└──────────────────┘

Estados del Workflow

Estado Tipo Descripción

ReceiveCDCEvent

Event

Escucha eventos CDC del topic cdc.public.customers via CloudEvents

ValidateCustomerData

Switch

Valida que el evento contenga email y first_name

ClassifyCustomer

Operation

Clasifica el tier del cliente: basic (<10k), standard (10k-50k), premium (>50k)

CheckApprovalRequired

Switch

Evalúa si el tier requiere aprobación manual

RequestManagerApproval

Callback

Envía solicitud de aprobación al topic bpm.approval.requests via Kafka Bridge. Espera evento de respuesta con timeout de 24 horas

EvaluateApproval

Switch

Evalúa la respuesta de aprobación

ProvisionAccount

Parallel

Ejecuta en paralelo: crear cuenta + enviar email de bienvenida via Mailpit

EmitOnboardingMetric

Operation

Publica métrica de onboarding al topic bpm.onboarding.metrics

Definición del SonataFlow CR

Eventos (Kafka CDC como CloudEvents)
spec:
  flow:
    start: ReceiveCDCEvent
    events:
      - name: cdcCustomerEvent
        type: cdc.public.customers
        source: debezium/cdc
      - name: approvalCompleted
        type: approval.completed
        source: internal
Funciones (clasificación, REST, logs)
    functions:
      - name: classifyCustomer
        type: expression
        operation: |
          .customer | if .amount >= 50000 then "premium"
                    elif .amount >= 10000 then "standard"
                    else "basic" end
      - name: sendApprovalRequest
        type: rest
        operation: "http://cdc-bridge-bridge-service.kafka-cdc.svc:8080/topics/bpm.approval.requests#POST"
      - name: sendWelcomeNotification
        type: rest
        operation: "http://n8n-mailpit.openshift-lightspeed.svc:8025/api/v1/send#POST"
      - name: publishMetrics
        type: rest
        operation: "http://cdc-bridge-bridge-service.kafka-cdc.svc:8080/topics/bpm.onboarding.metrics#POST"
Estado de aprobación con callback y timeout
      - name: RequestManagerApproval
        type: callback
        action:
          name: requestApproval
          functionRef:
            refName: sendApprovalRequest
          actionDataFilter:
            fromStateData: |
              {
                "records": [{
                  "value": {
                    "customer_id": .customer.id,
                    "name": "\(.customer.first_name) \(.customer.last_name)",
                    "email": .customer.email,
                    "tier": .tier,
                    "status": "pending_approval"
                  }
                }]
              }
        eventRef: approvalCompleted
        timeouts:
          eventTimeout: PT24H
        transition: EvaluateApproval
Ejecución paralela: provisionar + notificar
      - name: ProvisionAccount
        type: parallel
        branches:
          - name: CreateAccount
            actions:
              - name: logProvisioning
                functionRef:
                  refName: logEvent
          - name: SendWelcome
            actions:
              - name: sendWelcome
                functionRef:
                  refName: sendWelcomeNotification
                actionDataFilter:
                  fromStateData: |
                    {
                      "from": {"name": "NeuralBank Onboarding", "email": "onboarding@neuralbank.io"},
                      "to": [{"name": "\(.customer.first_name)", "email": "\(.customer.email)"}],
                      "subject": "Welcome to NeuralBank - \(.tier) Account",
                      "text": "Hello \(.customer.first_name)! Your \(.tier) account has been created."
                    }
        completionType: allOf

Topics Kafka para BPM

El módulo BPM agrega dos topics adicionales al cluster Kafka:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: bpm.approval.requests
  namespace: kafka-cdc
  labels:
    strimzi.io/cluster: cdc-cluster
spec:
  partitions: 3
  replicas: 3
  config:
    retention.ms: 604800000
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: bpm.onboarding.metrics
  namespace: kafka-cdc
  labels:
    strimzi.io/cluster: cdc-cluster
spec:
  partitions: 3
  replicas: 3
  config:
    retention.ms: 2592000000

Integración con el Pipeline CDC

El workflow SonataFlow se integra con los componentes existentes del pipeline CDC:

Componente Integración Protocolo

Debezium → Kafka

Produce eventos CDC que disparan el workflow

CloudEvents sobre Kafka

Kafka Bridge

El workflow publica aprobaciones y métricas via REST

HTTP → Kafka

Mailpit

El workflow envía emails de bienvenida y rechazo

HTTP REST

Apicurio Registry

Gobierno de esquemas de los topics BPM

REST

Grafana

Visualización de métricas de onboarding

Prometheus scraping

How it Works

Motor de ejecución SonataFlow

SonataFlow ejecuta workflows definidos en la Serverless Workflow Specification como microservicios Quarkus:

  1. El operador recibe el CR SonataFlow y genera un Quarkus application con el runtime del workflow embebido.

  2. Al recibir un evento CDC (CloudEvent via Kafka o HTTP), el motor crea una instancia del workflow con un ID único.

  3. Cada instancia mantiene su propio estado (state data) — un JSON document que se transforma en cada transición.

  4. El motor ejecuta las transiciones definidas en el workflow de forma event-driven: cada estado se evalúa y ejecuta en orden, usando el resultado del anterior como input.

Callback pattern: aprobaciones asíncronas

El estado RequestManagerApproval usa el callback pattern de la especificación:

  1. El workflow ejecuta la action (enviar solicitud de aprobación al topic bpm.approval.requests via Kafka Bridge).

  2. El motor suspende la instancia del workflow — no consume CPU ni memoria mientras espera.

  3. La instancia queda persistida en el Data Index con estado WAITING.

  4. Cuando llega el evento approvalCompleted (un mensaje en el topic con el workflowInstanceId correcto), el motor resume la instancia desde el punto exacto donde se suspendió.

  5. Si el evento no llega en PT24H (24 horas), el timeout dispara y el workflow transiciona al siguiente estado con un flag de timeout.

Ejecución paralela: branches

El estado ProvisionAccount ejecuta branches en paralelo:

  1. El motor lanza ambas ramas (CreateAccount y SendWelcome) simultáneamente como coroutines.

  2. completionType: allOf indica que el workflow espera a que ambas ramas terminen antes de transicionar.

  3. Si una rama falla, el workflow puede reintentar o manejar el error según la política definida.

  4. Los resultados de ambas ramas se fusionan en el state data de la instancia.

Data Index: consultas de estado

El Data Index Service escucha eventos de estado del motor via CloudEvents e indexa toda la información de instancias en PostgreSQL. Expone un endpoint GraphQL que permite consultar:

  • Instancias activas, completadas y con error

  • El state data actual de cada instancia

  • Historial de transiciones (qué estados se ejecutaron y cuándo)

  • Variables del proceso (customer data, tier, approval status)

La Management Console consume este GraphQL para mostrar dashboards operacionales.

Caso de Uso: Onboarding en Banca

El flujo completo para un banco (NeuralBank):

  1. Un operador bancario crea un nuevo cliente en la aplicación (INSERT en PostgreSQL)

  2. Debezium captura el cambio y publica al topic cdc.public.customers

  3. SonataFlow recibe el evento CDC como CloudEvent

  4. El workflow valida los datos del cliente

  5. Clasifica al cliente según monto: basic / standard / premium

  6. Para clientes premium: solicita aprobación al gerente con SLA de 24h

  7. Al aprobar: provisiona la cuenta y envía email de bienvenida en paralelo

  8. Publica métricas de onboarding para dashboards operacionales

Prueba End-to-End

-- Crear cliente basic (auto-approved)
INSERT INTO customers (first_name, last_name, email)
VALUES ('Juan', 'Basic', 'juan.basic@neuralbank.io');

-- Crear cliente premium (requires approval)
INSERT INTO customers (first_name, last_name, email)
VALUES ('Maria', 'Premium', 'maria.premium@neuralbank.io');

Desarrollo Visual con Kaoto

Los workflows SonataFlow se pueden diseñar visualmente usando la extensión Kaoto en Red Hat OpenShift DevSpaces:

  1. Abrir DevSpaces con el workspace que contiene el proyecto

  2. Instalar la extensión Kaoto desde el marketplace de VS Code

  3. Abrir el archivo .sw.yaml del workflow

  4. El editor visual muestra el diagrama BPMN interactivo

  5. Arrastrar y soltar estados, eventos y funciones

  6. El YAML se genera automáticamente

Esta integración permite que analistas de negocio y desarrolladores colaboren en la definición de procesos usando la misma herramienta.

Documentación Oficial

Comparación: Kogito vs RHPAM vs SonataFlow

Característica RHPAM 7.x (legacy) Kogito SonataFlow

Runtime

Wildfly/EAP

Quarkus/Spring Boot

Quarkus (serverless)

Definición de procesos

BPMN 2.0

BPMN 2.0 / DMN

Serverless Workflow Spec

Despliegue

Business Central + KIE Server

Kubernetes Operator

SonataFlow Operator

Escalabilidad

Horizontal manual

Cloud-native auto-scaling

Knative serverless (scale-to-zero)

Kafka integration

Via JMS/AMQP bridge

Nativo (Quarkus Kafka)

CloudEvents nativo

Visual designer

Business Central

VS Code Extension

Kaoto + VS Code

Estado

Deprecated (migrar a SonataFlow)

Maduro → migrar a SonataFlow

Recomendado para nuevos proyectos