We are using WSO2 stream processor in Teckportal to implement the complex rule engine. Its is an open source and lightweight stream processing platform that understands streaming SQL queries in order to capture, analyze, process and act on events in real time. This facilitates real-time streaming data integration and analytics. Unlike other offerings, it provides a simple two-node deployment for high availability and scales beyond with its distributed deployment to cater to extremely high workloads.

WSO2 supports 3 types of deployment strategies:

  • Minimum high availability deployment.
  • Fully distributed deployment.
  • Multi data center high availability deployment.

We decided to go with the minimum high availability deployment, because it can handle upto 100K transactions per second and that is more than enough for our current use case.

The following picture demonstrates how this deployment works:

(Image source: https://docs.wso2.com/download/attachments/112390542/Minimum_HA_Architecture.png?version=1&modificationDate=1540265668000&api=v2)

In this minimum HA setup, one node is assigned as the active node while the other node is assigned as the passive node. Only the  active node processes the incoming events and publishes the outgoing  events. Internally, the active node publishes the events to the passive  node, but the passive node does not process or send any events outside as mentioned earlier. In a scenario where the active node fails, the passive node is activated, and it starts receiving events and then publishes them from where the active node left off. Once the terminated (previously active) node restarts, it operates in the passive state. In the passive node, the data-bridge port and other ports related to  sources remain closed, and events are not fetched from pulling sources  such as Kafka, JMS, etc., until the node becomes active.

Setup

Download the docker compose files using the following command:

wget https://product-dist.wso2.com/downloads/streaming-processor/4.4.0/instruction-pages/non-subscription/docker/docker-sp-4.4.0.1.zip

Extract the downloaded zip file and navigate to the folder:

cd docker-sp-4.4.0.1/docker-compose/editor-worker-dashboard/

Edit the docker compose file and remove the MySQL image, because we will be using PostgreSQL with high availability for this setup. The final docker-compose.yml will look like this:

version: '2.3'
networks:
  frontend:
    ipam:
      config:
        - subnet: 172.24.0.0/16
services:
  editor:
    image: wso2/wso2sp-editor:4.4.0
    container_name: wso2sp-editor
    ports:
      - "9743:9743"
      - "9390:9390"
    healthcheck:
      test: ["CMD", "nc", "-z","localhost", "9743"]
      interval: 10s
      timeout: 120s
      retries: 5
    volumes:
      - ./editor:/home/wso2carbon/wso2-config-volume
    networks:
      frontend:
        ipv4_address: 172.24.0.3
  worker:
    image: wso2/wso2sp-worker:4.4.0
    container_name: wso2sp-worker
    ports:
      - "9090:9090"
      - "9443:9443"
      - "9893:9893"
    healthcheck:
      test: ["CMD", "nc", "-z","localhost", "9443"]
      interval: 10s
      timeout: 120s
      retries: 5
    volumes:
       - ./worker:/home/wso2carbon/wso2-config-volume
    networks:
      frontend:
        ipv4_address: 172.24.0.2
  dashboard:
    image: wso2/wso2sp-dashboard:4.4.0
    container_name: wso2sp-dashboard
    ports:
      - "9643:9643"
    healthcheck:
      test: ["CMD", "nc", "-z","localhost", "9643"]
      interval: 10s
      timeout: 120s
      retries: 5
    depends_on:
      worker:
        condition: service_healthy
    volumes:
      - ./dashboard:/home/wso2carbon/wso2-config-volume
    networks:
      frontend:
        ipv4_address: 172.24.0.4

Make sure that the IP address does not clash with the IP address of existing containers. Hard coding the IP address here makes it easier to add exceptions rules in the firewall.

The docker compose setup comes with two MySQL scripts which need to be run before starting the application. Since we are using PostgreSQL, we will have to convert them to work with it.

metrics.sql

CREATE DATABASE "WSO2_METRICS_DB";

\connect "WSO2_METRICS_DB";

CREATE SEQUENCE "METRIC_GAUGE_seq";

CREATE TABLE IF NOT EXISTS METRIC_GAUGE (
    ID BIGINT DEFAULT NEXTVAL ('"METRIC_GAUGE_seq"') PRIMARY KEY,
    SOURCE VARCHAR(255) NOT NULL,
    TIMESTAMP BIGINT NOT NULL,
    NAME VARCHAR(255) NOT NULL,
    VALUE VARCHAR(100) NOT NULL
);

CREATE SEQUENCE "METRIC_COUNTER_seq";

CREATE TABLE IF NOT EXISTS METRIC_COUNTER (
    ID BIGINT DEFAULT NEXTVAL ('"METRIC_COUNTER_seq"') PRIMARY KEY,
    SOURCE VARCHAR(255) NOT NULL,
    TIMESTAMP BIGINT NOT NULL,
    NAME VARCHAR(255) NOT NULL,
    COUNT BIGINT NOT NULL
);

CREATE SEQUENCE "METRIC_METER_seq";

CREATE TABLE IF NOT EXISTS METRIC_METER (
    ID BIGINT DEFAULT NEXTVAL ('"METRIC_METER_seq"') PRIMARY KEY,
    SOURCE VARCHAR(255) NOT NULL,
    TIMESTAMP BIGINT NOT NULL,
    NAME VARCHAR(255) NOT NULL,
    COUNT BIGINT NOT NULL,
    MEAN_RATE DECIMAL(10,0) NOT NULL,
    M1_RATE DECIMAL(10,0) NOT NULL,
    M5_RATE DECIMAL(10,0) NOT NULL,
    M15_RATE DECIMAL(10,0) NOT NULL,
    RATE_UNIT VARCHAR(50) NOT NULL
);

CREATE SEQUENCE "METRIC_HISTOGRAM_seq";

CREATE TABLE IF NOT EXISTS METRIC_HISTOGRAM (
    ID BIGINT DEFAULT NEXTVAL ('"METRIC_HISTOGRAM_seq"') PRIMARY KEY,
    SOURCE VARCHAR(255) NOT NULL,
    TIMESTAMP BIGINT NOT NULL,
    NAME VARCHAR(255) NOT NULL,
    COUNT BIGINT NOT NULL,
    MAX DECIMAL(10,0) NOT NULL,
    MEAN DECIMAL(10,0) NOT NULL,
    MIN DECIMAL(10,0) NOT NULL,
    STDDEV DECIMAL(10,0) NOT NULL,
    P50 DECIMAL(10,0) NOT NULL,
    P75 DECIMAL(10,0) NOT NULL,
    P95 DECIMAL(10,0) NOT NULL,
    P98 DECIMAL(10,0) NOT NULL,
    P99 DECIMAL(10,0) NOT NULL,
    P999 DECIMAL(10,0) NOT NULL
);

CREATE SEQUENCE "METRIC_TIMER_seq";

CREATE TABLE IF NOT EXISTS METRIC_TIMER (
    ID BIGINT DEFAULT NEXTVAL ('"METRIC_TIMER_seq"') PRIMARY KEY,
    SOURCE VARCHAR(255) NOT NULL,
    TIMESTAMP BIGINT NOT NULL,
    NAME VARCHAR(255) NOT NULL,
    COUNT BIGINT NOT NULL,
    MAX DECIMAL(10,0) NOT NULL,
    MEAN DECIMAL(10,0) NOT NULL,
    MIN DECIMAL(10,0) NOT NULL,
    STDDEV DECIMAL(10,0) NOT NULL,
    P50 DECIMAL(10,0) NOT NULL,
    P75 DECIMAL(10,0) NOT NULL,
    P95 DECIMAL(10,0) NOT NULL,
    P98 DECIMAL(10,0) NOT NULL,
    P99 DECIMAL(10,0) NOT NULL,
    P999 DECIMAL(10,0) NOT NULL,
    MEAN_RATE DECIMAL(10,0) NOT NULL,
    M1_RATE DECIMAL(10,0) NOT NULL,
    M5_RATE DECIMAL(10,0) NOT NULL,
    M15_RATE DECIMAL(10,0) NOT NULL,
    RATE_UNIT VARCHAR(50) NOT NULL,
    DURATION_UNIT VARCHAR(50) NOT NULL
);

CREATE INDEX "IDX_TIMESTAMP_GAUGE" ON METRIC_GAUGE (TIMESTAMP);
CREATE INDEX "IDX_TIMESTAMP_COUNTER" ON METRIC_COUNTER (TIMESTAMP);
CREATE INDEX "IDX_TIMESTAMP_METER" ON METRIC_METER (TIMESTAMP);
CREATE INDEX "IDX_TIMESTAMP_HISTOGRAM" ON METRIC_HISTOGRAM (TIMESTAMP);
CREATE INDEX "IDX_TIMESTAMP_TIMER" ON METRIC_TIMER (TIMESTAMP);

status.sql

CREATE DATABASE "WSO2_STATUS_DASHBOARD_DB";

\connect "WSO2_STATUS_DASHBOARD_DB";

CREATE SEQUENCE "METRIC_GAUGE_seq";

CREATE TABLE IF NOT EXISTS METRIC_GAUGE (
    ID BIGINT DEFAULT NEXTVAL ('"METRIC_GAUGE_seq"') PRIMARY KEY,
    SOURCE VARCHAR(255) NOT NULL,
    TIMESTAMP BIGINT NOT NULL,
    NAME VARCHAR(255) NOT NULL,
    VALUE VARCHAR(100) NOT NULL
);

CREATE SEQUENCE "METRIC_COUNTER_seq";

CREATE TABLE IF NOT EXISTS METRIC_COUNTER (
    ID BIGINT DEFAULT NEXTVAL ('"METRIC_COUNTER_seq"') PRIMARY KEY,
    SOURCE VARCHAR(255) NOT NULL,
    TIMESTAMP BIGINT NOT NULL,
    NAME VARCHAR(255) NOT NULL,
    COUNT BIGINT NOT NULL
);

CREATE SEQUENCE "METRIC_METER_seq";

CREATE TABLE IF NOT EXISTS METRIC_METER (
    ID BIGINT DEFAULT NEXTVAL ('"METRIC_METER_seq"') PRIMARY KEY,
    SOURCE VARCHAR(255) NOT NULL,
    TIMESTAMP BIGINT NOT NULL,
    NAME VARCHAR(255) NOT NULL,
    COUNT BIGINT NOT NULL,
    MEAN_RATE DECIMAL(10,0) NOT NULL,
    M1_RATE DECIMAL(10,0) NOT NULL,
    M5_RATE DECIMAL(10,0) NOT NULL,
    M15_RATE DECIMAL(10,0) NOT NULL,
    RATE_UNIT VARCHAR(50) NOT NULL
);

CREATE SEQUENCE "METRIC_HISTOGRAM_seq";

CREATE TABLE IF NOT EXISTS METRIC_HISTOGRAM (
    ID BIGINT DEFAULT NEXTVAL ('"METRIC_HISTOGRAM_seq"') PRIMARY KEY,
    SOURCE VARCHAR(255) NOT NULL,
    TIMESTAMP BIGINT NOT NULL,
    NAME VARCHAR(255) NOT NULL,
    COUNT BIGINT NOT NULL,
    MAX DECIMAL(10,0) NOT NULL,
    MEAN DECIMAL(10,0) NOT NULL,
    MIN DECIMAL(10,0) NOT NULL,
    STDDEV DECIMAL(10,0) NOT NULL,
    P50 DECIMAL(10,0) NOT NULL,
    P75 DECIMAL(10,0) NOT NULL,
    P95 DECIMAL(10,0) NOT NULL,
    P98 DECIMAL(10,0) NOT NULL,
    P99 DECIMAL(10,0) NOT NULL,
    P999 DECIMAL(10,0) NOT NULL
);

CREATE SEQUENCE "METRIC_TIMER_seq";

CREATE TABLE IF NOT EXISTS METRIC_TIMER (
    ID BIGINT DEFAULT NEXTVAL ('"METRIC_TIMER_seq"') PRIMARY KEY,
    SOURCE VARCHAR(255) NOT NULL,
    TIMESTAMP BIGINT NOT NULL,
    NAME VARCHAR(255) NOT NULL,
    COUNT BIGINT NOT NULL,
    MAX DECIMAL(10,0) NOT NULL,
    MEAN DECIMAL(10,0) NOT NULL,
    MIN DECIMAL(10,0) NOT NULL,
    STDDEV DECIMAL(10,0) NOT NULL,
    P50 DECIMAL(10,0) NOT NULL,
    P75 DECIMAL(10,0) NOT NULL,
    P95 DECIMAL(10,0) NOT NULL,
    P98 DECIMAL(10,0) NOT NULL,
    P99 DECIMAL(10,0) NOT NULL,
    P999 DECIMAL(10,0) NOT NULL,
    MEAN_RATE DECIMAL(10,0) NOT NULL,
    M1_RATE DECIMAL(10,0) NOT NULL,
    M5_RATE DECIMAL(10,0) NOT NULL,
    M15_RATE DECIMAL(10,0) NOT NULL,
    RATE_UNIT VARCHAR(50) NOT NULL,
    DURATION_UNIT VARCHAR(50) NOT NULL
);

CREATE INDEX "IDX_TIMESTAMP_GAUGE" ON METRIC_GAUGE (TIMESTAMP);
CREATE INDEX "IDX_TIMESTAMP_COUNTER" ON METRIC_COUNTER (TIMESTAMP);
CREATE INDEX "IDX_TIMESTAMP_METER" ON METRIC_METER (TIMESTAMP);
CREATE INDEX "IDX_TIMESTAMP_HISTOGRAM" ON METRIC_HISTOGRAM (TIMESTAMP);
CREATE INDEX "IDX_TIMESTAMP_TIMER" ON METRIC_TIMER (TIMESTAMP);

Now, access the PostgreSQL database to run the above scripts and create a few other databases which are required.

sudo su postgres
psql

# Create wso2 role
create role wso2 with login password '<your password>'

# Create database DASHBOARD_DB
CREATE DATABASE "DASHBOARD_DB" with owner wso2

# Create database PERSISTENCE_DB
CREATE DATABASE "PERSISTENCE_DB" with owner wso2

# Create database WSO2_CLUSTER_DB
CREATE DATABASE "WSO2_CLUSTER_DB" with owner wso2

# Create database WSO2_METRICS_DB
CREATE DATABASE "WSO2_METRICS_DB" with owner wso2

# Create database WSO2_STATUS_DASHBOARD_DB
CREATE DATABASE "WSO2_STATUS_DASHBOARD_DB" with owner wso2

# Run the above mentioned scripts
\i status.sql
\i metrics.sql

Now, modify the deployment files for both worker and dashboard to update the database configuration for the databases created above.

Dashboard config can be found in:  dashboard/conf/dashboard/deployment.yaml

Worker config can be found in:  worker/conf/worker/deployment.yaml

Sample config for PostgreSQL database inside the deploymet.yaml file is shown below:

- name: WSO2_DASHBOARD_DB
    description: The datasource used for dashboard feature
    jndiConfig:
      name: jdbc/DASHBOARD_DB
      useJndiReference: true
    definition:
      type: RDBMS
      configuration:
        jdbcUrl: jdbc:postgresql://<ip>:<port>/DASHBOARD_DB
        username: woso2
        password: password
        driverClassName: org.postgresql.Driver
        maxPoolSize: 10
        idleTimeout: 60000
        connectionTestQuery: SELECT 1
        validationTimeout: 30000
        isAutoCommit: false

In the dashboard deployment.yaml update the configurations for these databases - DASHBOARD_DB, WSO2_STATUS_DASHBOARD_DB, WSO2_METRICS_DB

In the worker deployment.yaml update the configurations for these databases - WSO2_CLUSTER_DB, WSO2_METRICS_DB, PERSISTENCE_DB

Now edit the worker deployment.yaml file in both the servers to support high availability:

  1. To allow the two nodes to use same persistence storage, you need to configure persistence configuration under the state.persistence.
state.persistence:
  enabled: true
  intervalInMin: 1
  revisionsToKeep: 2
  persistenceStore: org.wso2.carbon.stream.processor.core.persistence.DBPersistenceStore
  config:
    datasource: PERSISTENCE_DB   # A datasource with this name should be defined in wso2.datasources namespace
    table: PERSISTENCE_TABLE

2. Configure cluster coordination.

cluster.config:
  enabled: true
  groupId: <Group ID>
  coordinationStrategyClass: org.wso2.carbon.cluster.coordinator.rdbms.RDBMSCoordinationStrategy
  strategyConfig:
    datasource: WSO2_CLUSTER_DB
    heartbeatInterval: 1000
    heartbeatMaxRetry: 2
    eventPollingInterval: 1000

The group id should be different for both the servers.

3. Add the deployment.config section.

deployment.config:
  type: ha
  eventByteBufferQueueCapacity: 20000
  byteBufferExtractorThreadPoolSize: 5
  eventSyncServer:
    host: 0.0.0.0
    port: 9893
    advertisedHost: <Public ip address of the server>
    advertisedPort: 9893
    bossThreads: 10
    workerThreads: 10
  eventSyncClientPool:
    maxActive: 10
    maxTotal: 10
    maxIdle: 10
    maxWait: 60000
    minEvictableIdleTimeMillis: 120000

The following diagram illustrates how events can be published to a  two-node minimum HA cluster that uses sources such as HTTP, MQTT etc.

(Image source: https://docs.wso2.com/download/attachments/112390542/Publish_Events.png?version=1&modificationDate=1540265670000&api=v2)

Make sure that Siddhi applications are added to both the nodes, so that when the active node goes down, the passive node become active and deploys the application.