BPM with Kogito / SonataFlow

Introduction

Kogito is Red Hat’s cloud-native business process automation (BPM) technology, based on the Drools and jBPM projects. On OpenShift, the modern evolution of Kogito is SonataFlow — a serverless workflow engine that runs processes defined with the Serverless Workflow Specification.

The OpenShift Serverless Logic operator (logic-operator-rhel8) manages the full lifecycle of SonataFlow workflows on OpenShift.

Why BPM in the CDC Pipeline?

Without BPM With BPM (SonataFlow)

CDC events are processed linearly

CDC events trigger business processes with conditional logic

No approvals or SLAs

Approvals with timeout (24h SLA for premium customers)

Simple notifications (fire-and-forget)

Parallel orchestration: provision + notify at the same time

No customer classification

Automatic classification: basic / standard / premium

No process metrics

Onboarding metrics emitted to Kafka

Architecture

Diagram

Operator: OpenShift Serverless Logic

Install the operator from Red Hat Operators:

Operator subscription
apiVersion: operators.coreos.com/v1alpha1
kind: Subscription
metadata:
  name: logic-operator-rhel8
  namespace: openshift-serverless-logic
spec:
  channel: alpha
  installPlanApproval: Automatic
  name: logic-operator-rhel8
  source: redhat-operators
  sourceNamespace: openshift-marketplace

The operator provides the following CRDs:

  • SonataFlow — Defines a serverless workflow

  • SonataFlowPlatform — Configures the platform (build, runtime)

  • SonataFlowBuild — Manages workflow builds

  • SonataFlowClusterPlatform — Cluster-level configuration

SonataFlow Platform

Configure the platform in the kogito-bpm namespace:

apiVersion: sonataflow.org/v1alpha08
kind: SonataFlowPlatform
metadata:
  name: sonataflow-platform
  namespace: kogito-bpm
spec:
  build:
    config:
      strategyOptions:
        KanikoBuildCacheEnabled: "true"
  devMode:
    baseImage: registry.redhat.io/openshift-serverless-1/logic-swf-devmode-rhel8:1.36

Workflow: Customer Onboarding

The customer onboarding process is defined as a SonataFlow CR using the Serverless Workflow specification:

Process Diagram

┌──────────────────┐
│  CDC Event        │
│  (Kafka Topic)    │
└────────┬─────────┘
         │
┌────────▼─────────┐
│ Validate Customer │
│ Data              │
└────────┬─────────┘
         │
┌────────▼─────────┐
│ Classify Customer │
│ (basic/standard/  │
│  premium)         │
└────────┬─────────┘
         │
    ┌────┴────┐
    │ Premium? │
    └────┬────┘
   Yes   │   No
┌────────▼──┐  ┌──▼──────────┐
│ Manager    │  │ Auto-       │
│ Approval   │  │ Approved    │
│ (24h SLA)  │  │             │
└────────┬──┘  └──┬──────────┘
         │        │
┌────────▼────────▼┐
│ Provision Account │
│ (parallel)        │
│ ├─ Create Account │
│ └─ Send Welcome   │
└────────┬─────────┘
         │
┌────────▼─────────┐
│ Emit Onboarding   │
│ Metric            │
└──────────────────┘

Workflow States

State Type Description

ReceiveCDCEvent

Event

Listens for CDC events from topic cdc.public.customers via CloudEvents

ValidateCustomerData

Switch

Validates that the event contains email and first_name

ClassifyCustomer

Operation

Classifies customer tier: basic (<10k), standard (10k-50k), premium (>50k)

CheckApprovalRequired

Switch

Evaluates whether the tier requires manual approval

RequestManagerApproval

Callback

Sends approval request to topic bpm.approval.requests via Kafka Bridge. Waits for response event with 24-hour timeout

EvaluateApproval

Switch

Evaluates the approval response

ProvisionAccount

Parallel

Runs in parallel: create account + send welcome email via Mailpit

EmitOnboardingMetric

Operation

Publishes onboarding metric to topic bpm.onboarding.metrics

SonataFlow CR Definition

Events (Kafka CDC as CloudEvents)
spec:
  flow:
    start: ReceiveCDCEvent
    events:
      - name: cdcCustomerEvent
        type: cdc.public.customers
        source: debezium/cdc
      - name: approvalCompleted
        type: approval.completed
        source: internal
Functions (classification, REST, logs)
    functions:
      - name: classifyCustomer
        type: expression
        operation: |
          .customer | if .amount >= 50000 then "premium"
                    elif .amount >= 10000 then "standard"
                    else "basic" end
      - name: sendApprovalRequest
        type: rest
        operation: "http://cdc-bridge-bridge-service.kafka-cdc.svc:8080/topics/bpm.approval.requests#POST"
      - name: sendWelcomeNotification
        type: rest
        operation: "http://n8n-mailpit.openshift-lightspeed.svc:8025/api/v1/send#POST"
      - name: publishMetrics
        type: rest
        operation: "http://cdc-bridge-bridge-service.kafka-cdc.svc:8080/topics/bpm.onboarding.metrics#POST"
Approval state with callback and timeout
      - name: RequestManagerApproval
        type: callback
        action:
          name: requestApproval
          functionRef:
            refName: sendApprovalRequest
          actionDataFilter:
            fromStateData: |
              {
                "records": [{
                  "value": {
                    "customer_id": .customer.id,
                    "name": "\(.customer.first_name) \(.customer.last_name)",
                    "email": .customer.email,
                    "tier": .tier,
                    "status": "pending_approval"
                  }
                }]
              }
        eventRef: approvalCompleted
        timeouts:
          eventTimeout: PT24H
        transition: EvaluateApproval
Parallel execution: provision + notify
      - name: ProvisionAccount
        type: parallel
        branches:
          - name: CreateAccount
            actions:
              - name: logProvisioning
                functionRef:
                  refName: logEvent
          - name: SendWelcome
            actions:
              - name: sendWelcome
                functionRef:
                  refName: sendWelcomeNotification
                actionDataFilter:
                  fromStateData: |
                    {
                      "from": {"name": "NeuralBank Onboarding", "email": "onboarding@neuralbank.io"},
                      "to": [{"name": "\(.customer.first_name)", "email": "\(.customer.email)"}],
                      "subject": "Welcome to NeuralBank - \(.tier) Account",
                      "text": "Hello \(.customer.first_name)! Your \(.tier) account has been created."
                    }
        completionType: allOf

Kafka Topics for BPM

The BPM module adds two additional topics to the Kafka cluster:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: bpm.approval.requests
  namespace: kafka-cdc
  labels:
    strimzi.io/cluster: cdc-cluster
spec:
  partitions: 3
  replicas: 3
  config:
    retention.ms: 604800000
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: bpm.onboarding.metrics
  namespace: kafka-cdc
  labels:
    strimzi.io/cluster: cdc-cluster
spec:
  partitions: 3
  replicas: 3
  config:
    retention.ms: 2592000000

Integration with the CDC Pipeline

The SonataFlow workflow integrates with the existing CDC pipeline components:

Component Integration Protocol

Debezium → Kafka

Produces CDC events that trigger the workflow

CloudEvents over Kafka

Kafka Bridge

The workflow publishes approvals and metrics via REST

HTTP → Kafka

Mailpit

The workflow sends welcome and rejection emails

HTTP REST

Apicurio Registry

Schema governance for BPM topics

REST

Grafana

Onboarding metrics visualization

Prometheus scraping

How it Works

SonataFlow execution engine

SonataFlow runs workflows defined with the Serverless Workflow Specification as Quarkus microservices:

  1. The operator receives the SonataFlow CR and generates a Quarkus application with the workflow runtime embedded.

  2. When a CDC event arrives (CloudEvent via Kafka or HTTP), the engine creates a workflow instance with a unique ID.

  3. Each instance maintains its own state (state data) — a JSON document that transforms on each transition.

  4. The engine runs the transitions defined in the workflow in an event-driven way: each state is evaluated and executed in order, using the previous result as input.

Callback pattern: asynchronous approvals

The RequestManagerApproval state uses the specification’s callback pattern:

  1. The workflow runs the action (send approval request to topic bpm.approval.requests via Kafka Bridge).

  2. The engine suspends the workflow instance — it does not consume CPU or memory while waiting.

  3. The instance is persisted in the Data Index with status WAITING.

  4. When the approvalCompleted event arrives (a message on the topic with the correct workflowInstanceId), the engine resumes the instance from the exact point where it suspended.

  5. If the event does not arrive within PT24H (24 hours), the timeout fires and the workflow transitions to the next state with a timeout flag.

Parallel execution: branches

The ProvisionAccount state runs branches in parallel:

  1. The engine launches both branches (CreateAccount and SendWelcome) at the same time as coroutines.

  2. completionType: allOf means the workflow waits until both branches finish before transitioning.

  3. If a branch fails, the workflow can retry or handle the error according to the defined policy.

  4. The results of both branches are merged into the instance state data.

Data Index: state queries

The Data Index Service listens for engine state events via CloudEvents and indexes all instance information in PostgreSQL. It exposes a GraphQL endpoint that allows querying:

  • Active, completed, and failed instances

  • Current state data for each instance

  • Transition history (which states ran and when)

  • Process variables (customer data, tier, approval status)

The Management Console consumes this GraphQL to show operational dashboards.

Use Case: Banking Onboarding

The end-to-end flow for a bank (NeuralBank):

  1. A bank operator creates a new customer in the application (INSERT in PostgreSQL)

  2. Debezium captures the change and publishes to topic cdc.public.customers

  3. SonataFlow receives the CDC event as a CloudEvent

  4. The workflow validates customer data

  5. Classifies the customer by amount: basic / standard / premium

  6. For premium customers: requests manager approval with a 24h SLA

  7. On approval: provisions the account and sends welcome email in parallel

  8. Publishes onboarding metrics for operational dashboards

End-to-End Test

-- Create basic customer (auto-approved)
INSERT INTO customers (first_name, last_name, email)
VALUES ('Juan', 'Basic', 'juan.basic@neuralbank.io');

-- Create premium customer (requires approval)
INSERT INTO customers (first_name, last_name, email)
VALUES ('Maria', 'Premium', 'maria.premium@neuralbank.io');

Visual Development with Kaoto

SonataFlow workflows can be designed visually using the Kaoto extension in Red Hat OpenShift Dev Spaces:

  1. Open Dev Spaces with the workspace that contains the project

  2. Install the Kaoto extension from the VS Code marketplace

  3. Open the workflow .sw.yaml file

  4. The visual editor shows the interactive BPMN diagram

  5. Drag and drop states, events, and functions

  6. YAML is generated automatically

This integration lets business analysts and developers collaborate on process definitions using the same tool.

Official Documentation

Comparison: Kogito vs RHPAM vs SonataFlow

Feature RHPAM 7.x (legacy) Kogito SonataFlow

Runtime

Wildfly/EAP

Quarkus/Spring Boot

Quarkus (serverless)

Process definition

BPMN 2.0

BPMN 2.0 / DMN

Serverless Workflow Spec

Deployment

Business Central + KIE Server

Kubernetes Operator

SonataFlow Operator

Scalability

Manual horizontal

Cloud-native auto-scaling

Knative serverless (scale-to-zero)

Kafka integration

Via JMS/AMQP bridge

Native (Quarkus Kafka)

Native CloudEvents

Visual designer

Business Central

VS Code Extension

Kaoto + VS Code

Status

Deprecated (migrate to SonataFlow)

Mature → migrate to SonataFlow

Recommended for new projects