From e0c46542d93d74eae18e88d855e56b00abe4e4c2 Mon Sep 17 00:00:00 2001 From: Massimo Candela Date: Thu, 16 Jan 2020 17:04:30 +0100 Subject: [PATCH] reverted to not bundled IPC communication + moved JSON parsing on master process --- src/connectors/connector.js | 13 ++--------- src/connectors/connectorRIS.js | 11 ++++++---- src/connectors/connectorSwUpdates.js | 4 ++-- src/connectors/connectorTest.js | 2 +- src/consumer.js | 9 +++++--- src/model.js | 32 ++++++++++++++++------------ src/worker.js | 12 +++++++++-- 7 files changed, 46 insertions(+), 37 deletions(-) diff --git a/src/connectors/connector.js b/src/connectors/connector.js index 2625a7a..4360094 100644 --- a/src/connectors/connector.js +++ b/src/connectors/connector.js @@ -44,9 +44,6 @@ export default class Connector { this.connectCallback = null; this.errorCallback = null; this.disconnectCallback = null; - - this.batch = []; - setInterval(this._sendBatch, 500); } connect = () => @@ -67,15 +64,9 @@ export default class Connector { this.disconnectCallback(message); }; - _sendBatch = () => { - if (this.messageCallback && this.batch.length) { - this.messageCallback(this.name + "-" + '[' + this.batch.join(',') + ']'); - } - this.batch = []; - }; - _message = (message) => { - this.batch.push(message); + if (this.messageCallback) + this.messageCallback(message); }; _connect = (message) => { diff --git a/src/connectors/connectorRIS.js b/src/connectors/connectorRIS.js index 1cc05e7..9f2efc9 100644 --- a/src/connectors/connectorRIS.js +++ b/src/connectors/connectorRIS.js @@ -41,7 +41,7 @@ export default class ConnectorRIS extends Connector{ super(name, params, env); this.ws = null; this.subscription = null; - setInterval(this._ping, 10000); + setInterval(this._ping, 5000); this.url = brembo.build(this.params.url, { path: [], @@ -55,8 +55,7 @@ export default class ConnectorRIS extends Connector{ _ping = () => { if (this.ws) { try { - this.ws.ping(() => { - }); + this.ws.ping(); } catch (e) { // Nothing to do here } @@ -68,6 +67,10 @@ export default class ConnectorRIS extends Connector{ this._connect(this.name + ' connector connected'); }; + _messageToJson = (message) => { + this._message(JSON.parse(message)); + }; + connect = () => new Promise((resolve, reject) => { try { @@ -75,7 +78,7 @@ export default class ConnectorRIS extends Connector{ perMessageDeflate: this.params.perMessageDeflate }); - this.ws.on('message', this._message); + this.ws.on('message', this._messageToJson); this.ws.on('close', (error) => { this._close("RIPE RIS disconnected (error: " + error + "). Please, provide a feedback to rislive@ripe.net on the importance of the reliability of this service."); }); diff --git a/src/connectors/connectorSwUpdates.js b/src/connectors/connectorSwUpdates.js index 5d51d23..066c429 100644 --- a/src/connectors/connectorSwUpdates.js +++ b/src/connectors/connectorSwUpdates.js @@ -52,12 +52,12 @@ export default class ConnectorSwUpdates extends Connector{ }) .then(data => { if (data && data.data && data.data.version && data.data.version !== this.version){ - this._message(JSON.stringify({ + this._message({ type: "software-update", currentVersion: this.version, newVersion: data.data.version, repo: "https://github.com/nttgin/BGPalerter" - })); + }); } }) .catch(() => { diff --git a/src/connectors/connectorTest.js b/src/connectors/connectorTest.js index 9654ddc..9ab3c39 100644 --- a/src/connectors/connectorTest.js +++ b/src/connectors/connectorTest.js @@ -304,7 +304,7 @@ export default class ConnectorTest extends Connector{ this.timer = setInterval(() => { updates.forEach(update => { - this._message(JSON.stringify(update)); + this._message(update); if (type === 'visibility') { let peer = update.data.peer.split('.'); peer[3] = Math.min(parseInt(peer[3]) + 1, 254); diff --git a/src/consumer.js b/src/consumer.js index 72efd82..a27f059 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -55,9 +55,12 @@ export default class Consumer { dispatch = (data) => { try { - const connector = data.slice(0, 3); - const messagesRaw = JSON.parse(data.slice(4)); - const messages = [].concat.apply([], messagesRaw.map(this.connectors[connector].transform)) || []; + // const connector = data.slice(0, 3); + // const messagesRaw = JSON.parse(data.slice(4)); + + const connector = data.connector; + const messagesRaw = data.message; + const messages = this.connectors[connector].transform(messagesRaw) || []; for (let monitor of this.monitors) { diff --git a/src/model.js b/src/model.js index d874c54..1f97458 100644 --- a/src/model.js +++ b/src/model.js @@ -27,7 +27,7 @@ export class AS { constructor(numbers) { this.numbers = null; this.ASset = false; - this._instanceIndex = 0; + this._valid = null; if (["string", "number"].includes(typeof(numbers))) { this.numbers = [ numbers ]; @@ -57,23 +57,27 @@ export class AS { }; isValid () { - return this.numbers.length > 0 && - this.numbers - .every(asn => { + if (this._valid === null) { + this._valid = this.numbers.length > 0 && + this.numbers + .every(asn => { - try { - const intAsn = parseInt(asn); - if (intAsn != asn) { + try { + const intAsn = parseInt(asn); + if (intAsn != asn) { + return false; + } + asn = intAsn; + } catch (e) { return false; } - asn = intAsn; - } catch (e) { - return false; - } - return asn > 0 && asn <= 4294967295; - }) && - [...new Set(this.numbers.map(i => parseInt(i)))].length === this.numbers.length; + return asn > 0 && asn <= 4294967295; + }) && + [...new Set(this.numbers.map(i => parseInt(i)))].length === this.numbers.length; + } + + return this._valid; }; includes (ASn){ diff --git a/src/worker.js b/src/worker.js index 35f5867..4d8f6b4 100644 --- a/src/worker.js +++ b/src/worker.js @@ -105,11 +105,19 @@ export default class Worker { if (worker){ connector.onMessage((message) => { - worker.send(message); + // worker.send(connector.name + '-' + message); + worker.send({ + connector: connector.name, + message + }); }); } else { connector.onMessage((message) => { - this.pubSub.publish("data", message); + // this.pubSub.publish("data", connector.name + '-' + message); + this.pubSub.publish("data", { + connector: connector.name, + message + }); }); } }