1
0
mirror of https://github.com/nttgin/BGPalerter.git synced 2024-05-19 06:50:08 +00:00

Merge pull request #135 from nttgin/fastpipe

IPC/PubSub performance improvement
This commit is contained in:
Massimo Candela
2020-01-14 14:40:22 +01:00
committed by GitHub
8 changed files with 49 additions and 15 deletions

View File

@@ -6,6 +6,7 @@ connectors:
params:
carefulSubscription: true
url: wss://ris-live.ripe.net/v1/ws/
perMessageDeflate: true
subscription:
moreSpecific: true
type: UPDATE
@@ -150,6 +151,7 @@ reports:
# urls:
# default: _YOUR_ALERTA_API_URL_
############################
# Notification settings:
# - notificationIntervalSeconds
@@ -166,11 +168,6 @@ alertOnlyOnce: false
############################
# Below the files containing the monitored prefixes. Please see prefixes.yml for an example.
# This is an array (use new lines and dashes!)
monitoredPrefixesFiles:
- prefixes.yml
logging:
directory: logs
logRotatePattern: YYYY-MM-DD # Whenever the pattern changes, a new file is created and the old one rotated
@@ -180,6 +177,9 @@ logging:
checkForUpdatesAtBoot: true
############################
# Process monitoring settings:
# Uncomment or add classes under uptimeMonitors if you want to monitor or send logs about the status of the BGPalerter process
#uptimeMonitors:
# - file: uptimeApi
@@ -194,4 +194,15 @@ checkForUpdatesAtBoot: true
# intervalSeconds: 300
# method: get
############################
pidFile: bgpalerter.pid
multiProcess: false
############################
# Below the files containing the monitored prefixes. Please see prefixes.yml for an example.
# This is an array (use new lines and dashes!)
monitoredPrefixesFiles:
- prefixes.yml

View File

@@ -44,6 +44,10 @@ export default class Connector {
this.connectCallback = null;
this.errorCallback = null;
this.disconnectCallback = null;
this.timerBatch = null;
this.batch = [];
this.batchInterval = 500;
}
connect = () =>
@@ -64,10 +68,20 @@ export default class Connector {
this.disconnectCallback(message);
};
_sendBatch = () => {
clearTimeout(this.timerBatch);
delete this.timerBatch;
if (this.messageCallback && this.batch.length) {
this.messageCallback(this.name + "-" + '[' + this.batch.join(',') + ']');
}
this.batch = [];
};
_message = (message) => {
const msg = this.name + "-" + message;
if (this.messageCallback)
this.messageCallback(msg);
if (!this.timerBatch) {
this.timerBatch = setTimeout(this._sendBatch, this.batchInterval);
}
this.batch.push(message);
};
_connect = (message) => {

View File

@@ -72,7 +72,7 @@ export default class ConnectorRIS extends Connector{
new Promise((resolve, reject) => {
try {
this.ws = new WebSocket(this.url, {
perMessageDeflate: false
perMessageDeflate: this.params.perMessageDeflate
});
this.ws.on('message', this._message);

View File

@@ -52,11 +52,12 @@ export default class Consumer {
});
};
dispatch = (data) => {
try {
const connector = data.slice(0,3);
const connector = data.slice(0, 3);
const messagesRaw = JSON.parse(data.slice(4));
const messages = this.connectors[connector].transform(messagesRaw) || [];
const messages = [].concat.apply([], messagesRaw.map(this.connectors[connector].transform)) || [];
for (let monitor of this.monitors) {

View File

@@ -55,6 +55,7 @@ let config = {
params: {
carefulSubscription: true,
url: "wss://ris-live.ripe.net/v1/ws/",
perMessageDeflate: true,
subscription: {
moreSpecific: true,
type: "UPDATE",

View File

@@ -48,7 +48,7 @@ export default class Worker {
this.configFile = env.configFile;
if (this.config.environment === "test") {
if (!this.config.multiProcess) {
this.master();
new Consumer();
@@ -63,6 +63,11 @@ export default class Worker {
};
_multiProcessSend (message) {
this.send(message);
};
_singleProcessSend (message) {
return this.pubSub.publish("data", message);
};
@@ -108,7 +113,7 @@ export default class Worker {
for (const connector of connectorFactory.getConnectors()) {
if (worker){
connector.onMessage(worker.send.bind(worker));
connector.onMessage(this._multiProcessSend.bind(worker));
} else {
connector.onMessage(this._singleProcessSend);
}

View File

@@ -75,7 +75,8 @@ describe("Composition", function() {
"logging",
"checkForUpdatesAtBoot",
"uptimeMonitors",
"pidFile"
"pidFile",
"multiProcess"
]);
expect(config.connectors[0]).to.have
.property('class')

View File

@@ -71,4 +71,5 @@ uptimeMonitors:
host: null
port: 8011
pidFile: bgpalerter.pid
pidFile: bgpalerter.pid
multiProcess: false