1
0
mirror of https://github.com/nttgin/BGPalerter.git synced 2024-05-19 06:50:08 +00:00

improved reconnection feature - following #187

This commit is contained in:
Massimo Candela
2020-03-21 16:21:17 +01:00
parent 3282d30c1e
commit 400a92016e
4 changed files with 78 additions and 81 deletions

View File

@ -56,82 +56,72 @@ export default class ConnectorFactory {
}
};
connectConnectors = () =>
_connectConnector = (connector) => {
connector.onError(error => {
logger.log({
level: 'error',
message: error
});
});
connector.onConnect(message => {
logger.log({
level: 'info',
message: message
});
});
connector.onDisconnect(error => {
if (error) {
logger.log({
level: 'error',
message: error
});
} else {
logger.log({
level: 'info',
message: connector.name + ' disconnected'
});
}
});
return connector.connect()
.catch(error => {
// If not connected log the error and move on
if (error) {
logger.log({
level: 'error',
message: error
});
}
});
};
connectConnectors = (params) =>
new Promise((resolve, reject) => {
const connectors = this.getConnectors();
if (connectors.length === 0) {
reject(new Error("No connections available"));
} else {
resolve(Promise.all(connectors
.map(connector =>
new Promise((resolve, reject) => {
connector.onError(error => {
logger.log({
level: 'error',
message: error
});
});
connector.onConnect(message => {
connector.connected = true;
logger.log({
level: 'info',
message: message
});
});
connector.onDisconnect(error => {
connector.connected = false;
const calls = connectors
.map(connector => {
return this._connectConnector(connector)
.then(() => {
connector.subscribe(params);
})
.catch((error) => {
if (error) {
logger.log({
level: 'error',
message: error
});
} else {
logger.log({
level: 'info',
message: connector.name + ' disconnected'
});
}
});
})
});
connector
.connect()
.then(() => {
connector.connected = true;
resolve(true);
})
.catch((error) => {
if (error) {
env.logger.log({
level: 'error',
message: error
});
}
resolve(false);
})
}))));
resolve(Promise.all(calls));
}
});
subscribeConnectors = (params, callback) =>
new Promise((resolve, reject) => {
const connectors = this.getConnectors();
if (connectors.length === 0) {
reject(new Error("No connections available"));
} else {
const connectorList = connectors
.map(connector => connector.subscribe(params));
resolve(Promise.all(connectorList));
}
})
}

View File

@ -43,8 +43,8 @@ export default class ConnectorRIS extends Connector{
this.ws = null;
this.subscription = null;
this.pingInterval = 5000;
this.params.reconnectTimeoutSeconds = this.params.reconnectTimeoutSeconds || 10;
this.reconnectTimeout = this.params.reconnectTimeoutSeconds * 1000;
this._defaultReconnectTimeout = 10000;
this.reconnectTimeout = this._defaultReconnectTimeout;
setInterval(this._ping, this.pingInterval);
@ -77,7 +77,7 @@ export default class ConnectorRIS extends Connector{
_openConnect = (resolve) => {
this.connected = true;
resolve(true);
this.reconnectTimeout = this.params.reconnectTimeoutSeconds * 1000;
this.reconnectTimeout = this._defaultReconnectTimeout;
this._connect(this.name + ' connector connected');
};
@ -95,26 +95,37 @@ export default class ConnectorRIS extends Connector{
this.ws.on('message', this._messageToJson);
this.ws.on('close', (error) => {
const message = (this.connected) ?
"RIPE RIS disconnected (error: " + error + "). Read more at https://github.com/nttgin/BGPalerter/blob/master/docs/ris-disconnections.md" :
"It was not possible to establish a connection with RIPE RIS";
this._close(message);
if (this.connected) {
this._close("RIPE RIS disconnected (error: " + error + "). Read more at https://github.com/nttgin/BGPalerter/blob/master/docs/ris-disconnections.md");
} else {
this._close("It was not possible to establish a connection with RIPE RIS");
reject();
}
});
this.ws.on('error', this._error);
this.ws.on('open', this._openConnect.bind(null, resolve));
this.ws.on('ping', this._pingReceived);
} catch(error) {
this._error(error);
resolve(false);
reject(error);
}
});
_reconnect = () => {
this.connect()
.then(this.subscribe.bind(null, this.subscription));
.then(() => {
if (this.subscription) {
this.subscribe(this.subscription);
}
})
.catch(error => {
if (error) {
this.logger.log({
level: 'error',
message: error
});
}
});
};
_getTimeoutReconnect = () => {
@ -125,9 +136,9 @@ export default class ConnectorRIS extends Connector{
_close = (error) => {
this._disconnect(error);
try {
this.connected = false;
this.ws.terminate();
this.ws.removeAllListeners();
this.connected = false;
} catch(e) {
// Nothing to do here
}
@ -169,7 +180,6 @@ export default class ConnectorRIS extends Connector{
const params = JSON.parse(JSON.stringify(this.params.subscription));
if (monitoredPrefixes
.filter(
i => (ipUtils._isEqualPrefix(i.prefix, '0:0:0:0:0:0:0:0/0') || ipUtils._isEqualPrefix(i.prefix,'0.0.0.0/0'))

View File

@ -52,7 +52,6 @@ let config = {
file: "connectorRIS",
name: "ris",
params: {
reconnectTimeoutSeconds: 10,
carefulSubscription: true,
url: "wss://ris-live.ripe.net/v1/ws/",
perMessageDeflate: true,

View File

@ -104,9 +104,8 @@ export default class Worker {
this.config.maxMessagesPerSecond = this.config.maxMessagesPerSecond || 6000;
const buffer = new LossyBuffer(parseInt(this.config.maxMessagesPerSecond /(1000/bufferCleaningInterval)), bufferCleaningInterval, this.logger);
connectorFactory.loadConnectors();
return connectorFactory.connectConnectors()
return connectorFactory.connectConnectors(this.input)
.then(() => {
for (const connector of connectorFactory.getConnectors()) {
connector.onMessage((message) => {
@ -127,7 +126,6 @@ export default class Worker {
}
})
.then(() => connectorFactory.subscribeConnectors(this.input))
.catch(error => {
this.logger.log({
level: 'error',