WSO2 Stream Processor High Availability Deployment
We are using WSO2 stream processor in Teckportal to implement the complex rule engine. Its is an open source and lightweight stream processing platform that understands streaming SQL queries in order to capture, analyze, process and act on events in real time. This facilitates real-time streaming data integration and analytics. Unlike other offerings, it provides a simple two-node deployment for high availability and scales beyond with its distributed deployment to cater to extremely high workloads.
WSO2 supports 3 types of deployment strategies:
- Minimum high availability deployment.
- Fully distributed deployment.
- Multi data center high availability deployment.
We decided to go with the minimum high availability deployment, because it can handle upto 100K transactions per second and that is more than enough for our current use case.
The following picture demonstrates how this deployment works:
In this minimum HA setup, one node is assigned as the active node while the other node is assigned as the passive node. Only the active node processes the incoming events and publishes the outgoing events. Internally, the active node publishes the events to the passive node, but the passive node does not process or send any events outside as mentioned earlier. In a scenario where the active node fails, the passive node is activated, and it starts receiving events and then publishes them from where the active node left off. Once the terminated (previously active) node restarts, it operates in the passive state. In the passive node, the data-bridge port and other ports related to sources remain closed, and events are not fetched from pulling sources such as Kafka, JMS, etc., until the node becomes active.
Setup
Download the docker compose files using the following command:
wget https://product-dist.wso2.com/downloads/streaming-processor/4.4.0/instruction-pages/non-subscription/docker/docker-sp-4.4.0.1.zip
Extract the downloaded zip file and navigate to the folder:
cd docker-sp-4.4.0.1/docker-compose/editor-worker-dashboard/
Edit the docker compose file and remove the MySQL image, because we will be using PostgreSQL with high availability for this setup. The final docker-compose.yml will look like this:
version: '2.3'
networks:
frontend:
ipam:
config:
- subnet: 172.24.0.0/16
services:
editor:
image: wso2/wso2sp-editor:4.4.0
container_name: wso2sp-editor
ports:
- "9743:9743"
- "9390:9390"
healthcheck:
test: ["CMD", "nc", "-z","localhost", "9743"]
interval: 10s
timeout: 120s
retries: 5
volumes:
- ./editor:/home/wso2carbon/wso2-config-volume
networks:
frontend:
ipv4_address: 172.24.0.3
worker:
image: wso2/wso2sp-worker:4.4.0
container_name: wso2sp-worker
ports:
- "9090:9090"
- "9443:9443"
- "9893:9893"
healthcheck:
test: ["CMD", "nc", "-z","localhost", "9443"]
interval: 10s
timeout: 120s
retries: 5
volumes:
- ./worker:/home/wso2carbon/wso2-config-volume
networks:
frontend:
ipv4_address: 172.24.0.2
dashboard:
image: wso2/wso2sp-dashboard:4.4.0
container_name: wso2sp-dashboard
ports:
- "9643:9643"
healthcheck:
test: ["CMD", "nc", "-z","localhost", "9643"]
interval: 10s
timeout: 120s
retries: 5
depends_on:
worker:
condition: service_healthy
volumes:
- ./dashboard:/home/wso2carbon/wso2-config-volume
networks:
frontend:
ipv4_address: 172.24.0.4
Make sure that the IP address does not clash with the IP address of existing containers. Hard coding the IP address here makes it easier to add exceptions rules in the firewall.
The docker compose setup comes with two MySQL scripts which need to be run before starting the application. Since we are using PostgreSQL, we will have to convert them to work with it.
metrics.sql
CREATE DATABASE "WSO2_METRICS_DB";
\connect "WSO2_METRICS_DB";
CREATE SEQUENCE "METRIC_GAUGE_seq";
CREATE TABLE IF NOT EXISTS METRIC_GAUGE (
ID BIGINT DEFAULT NEXTVAL ('"METRIC_GAUGE_seq"') PRIMARY KEY,
SOURCE VARCHAR(255) NOT NULL,
TIMESTAMP BIGINT NOT NULL,
NAME VARCHAR(255) NOT NULL,
VALUE VARCHAR(100) NOT NULL
);
CREATE SEQUENCE "METRIC_COUNTER_seq";
CREATE TABLE IF NOT EXISTS METRIC_COUNTER (
ID BIGINT DEFAULT NEXTVAL ('"METRIC_COUNTER_seq"') PRIMARY KEY,
SOURCE VARCHAR(255) NOT NULL,
TIMESTAMP BIGINT NOT NULL,
NAME VARCHAR(255) NOT NULL,
COUNT BIGINT NOT NULL
);
CREATE SEQUENCE "METRIC_METER_seq";
CREATE TABLE IF NOT EXISTS METRIC_METER (
ID BIGINT DEFAULT NEXTVAL ('"METRIC_METER_seq"') PRIMARY KEY,
SOURCE VARCHAR(255) NOT NULL,
TIMESTAMP BIGINT NOT NULL,
NAME VARCHAR(255) NOT NULL,
COUNT BIGINT NOT NULL,
MEAN_RATE DECIMAL(10,0) NOT NULL,
M1_RATE DECIMAL(10,0) NOT NULL,
M5_RATE DECIMAL(10,0) NOT NULL,
M15_RATE DECIMAL(10,0) NOT NULL,
RATE_UNIT VARCHAR(50) NOT NULL
);
CREATE SEQUENCE "METRIC_HISTOGRAM_seq";
CREATE TABLE IF NOT EXISTS METRIC_HISTOGRAM (
ID BIGINT DEFAULT NEXTVAL ('"METRIC_HISTOGRAM_seq"') PRIMARY KEY,
SOURCE VARCHAR(255) NOT NULL,
TIMESTAMP BIGINT NOT NULL,
NAME VARCHAR(255) NOT NULL,
COUNT BIGINT NOT NULL,
MAX DECIMAL(10,0) NOT NULL,
MEAN DECIMAL(10,0) NOT NULL,
MIN DECIMAL(10,0) NOT NULL,
STDDEV DECIMAL(10,0) NOT NULL,
P50 DECIMAL(10,0) NOT NULL,
P75 DECIMAL(10,0) NOT NULL,
P95 DECIMAL(10,0) NOT NULL,
P98 DECIMAL(10,0) NOT NULL,
P99 DECIMAL(10,0) NOT NULL,
P999 DECIMAL(10,0) NOT NULL
);
CREATE SEQUENCE "METRIC_TIMER_seq";
CREATE TABLE IF NOT EXISTS METRIC_TIMER (
ID BIGINT DEFAULT NEXTVAL ('"METRIC_TIMER_seq"') PRIMARY KEY,
SOURCE VARCHAR(255) NOT NULL,
TIMESTAMP BIGINT NOT NULL,
NAME VARCHAR(255) NOT NULL,
COUNT BIGINT NOT NULL,
MAX DECIMAL(10,0) NOT NULL,
MEAN DECIMAL(10,0) NOT NULL,
MIN DECIMAL(10,0) NOT NULL,
STDDEV DECIMAL(10,0) NOT NULL,
P50 DECIMAL(10,0) NOT NULL,
P75 DECIMAL(10,0) NOT NULL,
P95 DECIMAL(10,0) NOT NULL,
P98 DECIMAL(10,0) NOT NULL,
P99 DECIMAL(10,0) NOT NULL,
P999 DECIMAL(10,0) NOT NULL,
MEAN_RATE DECIMAL(10,0) NOT NULL,
M1_RATE DECIMAL(10,0) NOT NULL,
M5_RATE DECIMAL(10,0) NOT NULL,
M15_RATE DECIMAL(10,0) NOT NULL,
RATE_UNIT VARCHAR(50) NOT NULL,
DURATION_UNIT VARCHAR(50) NOT NULL
);
CREATE INDEX "IDX_TIMESTAMP_GAUGE" ON METRIC_GAUGE (TIMESTAMP);
CREATE INDEX "IDX_TIMESTAMP_COUNTER" ON METRIC_COUNTER (TIMESTAMP);
CREATE INDEX "IDX_TIMESTAMP_METER" ON METRIC_METER (TIMESTAMP);
CREATE INDEX "IDX_TIMESTAMP_HISTOGRAM" ON METRIC_HISTOGRAM (TIMESTAMP);
CREATE INDEX "IDX_TIMESTAMP_TIMER" ON METRIC_TIMER (TIMESTAMP);
status.sql
CREATE DATABASE "WSO2_STATUS_DASHBOARD_DB";
\connect "WSO2_STATUS_DASHBOARD_DB";
CREATE SEQUENCE "METRIC_GAUGE_seq";
CREATE TABLE IF NOT EXISTS METRIC_GAUGE (
ID BIGINT DEFAULT NEXTVAL ('"METRIC_GAUGE_seq"') PRIMARY KEY,
SOURCE VARCHAR(255) NOT NULL,
TIMESTAMP BIGINT NOT NULL,
NAME VARCHAR(255) NOT NULL,
VALUE VARCHAR(100) NOT NULL
);
CREATE SEQUENCE "METRIC_COUNTER_seq";
CREATE TABLE IF NOT EXISTS METRIC_COUNTER (
ID BIGINT DEFAULT NEXTVAL ('"METRIC_COUNTER_seq"') PRIMARY KEY,
SOURCE VARCHAR(255) NOT NULL,
TIMESTAMP BIGINT NOT NULL,
NAME VARCHAR(255) NOT NULL,
COUNT BIGINT NOT NULL
);
CREATE SEQUENCE "METRIC_METER_seq";
CREATE TABLE IF NOT EXISTS METRIC_METER (
ID BIGINT DEFAULT NEXTVAL ('"METRIC_METER_seq"') PRIMARY KEY,
SOURCE VARCHAR(255) NOT NULL,
TIMESTAMP BIGINT NOT NULL,
NAME VARCHAR(255) NOT NULL,
COUNT BIGINT NOT NULL,
MEAN_RATE DECIMAL(10,0) NOT NULL,
M1_RATE DECIMAL(10,0) NOT NULL,
M5_RATE DECIMAL(10,0) NOT NULL,
M15_RATE DECIMAL(10,0) NOT NULL,
RATE_UNIT VARCHAR(50) NOT NULL
);
CREATE SEQUENCE "METRIC_HISTOGRAM_seq";
CREATE TABLE IF NOT EXISTS METRIC_HISTOGRAM (
ID BIGINT DEFAULT NEXTVAL ('"METRIC_HISTOGRAM_seq"') PRIMARY KEY,
SOURCE VARCHAR(255) NOT NULL,
TIMESTAMP BIGINT NOT NULL,
NAME VARCHAR(255) NOT NULL,
COUNT BIGINT NOT NULL,
MAX DECIMAL(10,0) NOT NULL,
MEAN DECIMAL(10,0) NOT NULL,
MIN DECIMAL(10,0) NOT NULL,
STDDEV DECIMAL(10,0) NOT NULL,
P50 DECIMAL(10,0) NOT NULL,
P75 DECIMAL(10,0) NOT NULL,
P95 DECIMAL(10,0) NOT NULL,
P98 DECIMAL(10,0) NOT NULL,
P99 DECIMAL(10,0) NOT NULL,
P999 DECIMAL(10,0) NOT NULL
);
CREATE SEQUENCE "METRIC_TIMER_seq";
CREATE TABLE IF NOT EXISTS METRIC_TIMER (
ID BIGINT DEFAULT NEXTVAL ('"METRIC_TIMER_seq"') PRIMARY KEY,
SOURCE VARCHAR(255) NOT NULL,
TIMESTAMP BIGINT NOT NULL,
NAME VARCHAR(255) NOT NULL,
COUNT BIGINT NOT NULL,
MAX DECIMAL(10,0) NOT NULL,
MEAN DECIMAL(10,0) NOT NULL,
MIN DECIMAL(10,0) NOT NULL,
STDDEV DECIMAL(10,0) NOT NULL,
P50 DECIMAL(10,0) NOT NULL,
P75 DECIMAL(10,0) NOT NULL,
P95 DECIMAL(10,0) NOT NULL,
P98 DECIMAL(10,0) NOT NULL,
P99 DECIMAL(10,0) NOT NULL,
P999 DECIMAL(10,0) NOT NULL,
MEAN_RATE DECIMAL(10,0) NOT NULL,
M1_RATE DECIMAL(10,0) NOT NULL,
M5_RATE DECIMAL(10,0) NOT NULL,
M15_RATE DECIMAL(10,0) NOT NULL,
RATE_UNIT VARCHAR(50) NOT NULL,
DURATION_UNIT VARCHAR(50) NOT NULL
);
CREATE INDEX "IDX_TIMESTAMP_GAUGE" ON METRIC_GAUGE (TIMESTAMP);
CREATE INDEX "IDX_TIMESTAMP_COUNTER" ON METRIC_COUNTER (TIMESTAMP);
CREATE INDEX "IDX_TIMESTAMP_METER" ON METRIC_METER (TIMESTAMP);
CREATE INDEX "IDX_TIMESTAMP_HISTOGRAM" ON METRIC_HISTOGRAM (TIMESTAMP);
CREATE INDEX "IDX_TIMESTAMP_TIMER" ON METRIC_TIMER (TIMESTAMP);
Now, access the PostgreSQL database to run the above scripts and create a few other databases which are required.
sudo su postgres
psql
# Create wso2 role
create role wso2 with login password '<your password>'
# Create database DASHBOARD_DB
CREATE DATABASE "DASHBOARD_DB" with owner wso2
# Create database PERSISTENCE_DB
CREATE DATABASE "PERSISTENCE_DB" with owner wso2
# Create database WSO2_CLUSTER_DB
CREATE DATABASE "WSO2_CLUSTER_DB" with owner wso2
# Create database WSO2_METRICS_DB
CREATE DATABASE "WSO2_METRICS_DB" with owner wso2
# Create database WSO2_STATUS_DASHBOARD_DB
CREATE DATABASE "WSO2_STATUS_DASHBOARD_DB" with owner wso2
# Run the above mentioned scripts
\i status.sql
\i metrics.sql
Now, modify the deployment files for both worker and dashboard to update the database configuration for the databases created above.
Dashboard config can be found in: dashboard/conf/dashboard/deployment.yaml
Worker config can be found in: worker/conf/worker/deployment.yaml
Sample config for PostgreSQL database inside the deploymet.yaml file is shown below:
- name: WSO2_DASHBOARD_DB
description: The datasource used for dashboard feature
jndiConfig:
name: jdbc/DASHBOARD_DB
useJndiReference: true
definition:
type: RDBMS
configuration:
jdbcUrl: jdbc:postgresql://<ip>:<port>/DASHBOARD_DB
username: woso2
password: password
driverClassName: org.postgresql.Driver
maxPoolSize: 10
idleTimeout: 60000
connectionTestQuery: SELECT 1
validationTimeout: 30000
isAutoCommit: false
In the dashboard deployment.yaml update the configurations for these databases - DASHBOARD_DB, WSO2_STATUS_DASHBOARD_DB, WSO2_METRICS_DB
In the worker deployment.yaml update the configurations for these databases - WSO2_CLUSTER_DB, WSO2_METRICS_DB, PERSISTENCE_DB
Now edit the worker deployment.yaml file in both the servers to support high availability:
- To allow the two nodes to use same persistence storage, you need to configure persistence configuration under the
state.persistence
.
state.persistence:
enabled: true
intervalInMin: 1
revisionsToKeep: 2
persistenceStore: org.wso2.carbon.stream.processor.core.persistence.DBPersistenceStore
config:
datasource: PERSISTENCE_DB # A datasource with this name should be defined in wso2.datasources namespace
table: PERSISTENCE_TABLE
2. Configure cluster coordination.
cluster.config:
enabled: true
groupId: <Group ID>
coordinationStrategyClass: org.wso2.carbon.cluster.coordinator.rdbms.RDBMSCoordinationStrategy
strategyConfig:
datasource: WSO2_CLUSTER_DB
heartbeatInterval: 1000
heartbeatMaxRetry: 2
eventPollingInterval: 1000
The group id should be different for both the servers.
3. Add the deployment.config
section.
deployment.config:
type: ha
eventByteBufferQueueCapacity: 20000
byteBufferExtractorThreadPoolSize: 5
eventSyncServer:
host: 0.0.0.0
port: 9893
advertisedHost: <Public ip address of the server>
advertisedPort: 9893
bossThreads: 10
workerThreads: 10
eventSyncClientPool:
maxActive: 10
maxTotal: 10
maxIdle: 10
maxWait: 60000
minEvictableIdleTimeMillis: 120000
The following diagram illustrates how events can be published to a two-node minimum HA cluster that uses sources such as HTTP, MQTT etc.
Make sure that Siddhi applications are added to both the nodes, so that when the active node goes down, the passive node become active and deploys the application.