1
0
mirror of https://github.com/nttgin/BGPalerter.git synced 2024-05-19 06:50:08 +00:00

introduced consumers polymorphism

This commit is contained in:
Massimo Candela
2019-07-05 00:09:59 +02:00
parent 51de58e667
commit 9b9c36afeb
8 changed files with 192 additions and 89 deletions

View File

@@ -2,6 +2,7 @@
connectors:
- file: connectorRIS
name: ris
params:
url: wss://ris-live.ripe.net/v1/ws/
moreSpecific: true

View File

@@ -3,33 +3,65 @@ import env from "./env";
export default class ConnectorFactory {
constructor() {
this.disconnected = [];
this.connected = [];
this.connectors = {};
}
getConnector = (name) => {
return this.connectors[name];
};
getConnectors = () => {
return Object.keys(this.connectors).map(name => this.connectors[name]);
};
loadConnectors = () => {
if (this.disconnected.length === 0) {
this.disconnected = config.reports.map(connector => new connector.class(connector.params, env));
const connectors = Object.keys(this.connectors);
if (connectors.length === 0) {
for (let connector of env.config.connectors) {
this.connectors[connector.name] = new connector.class(connector.params, env);
}
}
};
connectConnectors = () =>
Promise.all(this.disconnected.map(connector => {
connector.connect()
.then(() => {
this.connected.push();
});
}));
new Promise((resolve, reject) => {
const connectors = this.getConnectors();
subscribeConnectors = (params) =>
if (connectors.length === 0) {
reject(new Error("No connections available"));
} else {
resolve(Promise.all(connectors
.map(connector =>
new Promise((resolve, reject) => {
connector.connect()
.then(() => {
connector.connected = true;
resolve(true);
})
.catch((error) => {
env.logger.log({
level: 'error',
message: error
});
resolve(false);
})
}))));
}
});
subscribeConnectors = (params, callback) =>
new Promise((resolve, reject) => {
if (this.connectors.length === 0) {
reject(new Error("No connectors loaded"));
} else {
const connectors = this.getConnectors();
resolve(Promise.all(this.connectors.map(connector => connector.subscribe(params))));
if (connectors.length === 0) {
reject(new Error("No connections available"));
} else {
resolve(Promise.all(connectors.map(connector => {
connector.subscribe(params);
})));
}
});

View File

@@ -1,5 +1,4 @@
export default class Connector {
constructor(params, env){
@@ -16,7 +15,7 @@ export default class Connector {
new Promise((resolve, reject) => reject(new Error('The method connect has to be implemented')));
close = () => {
error = () => {
this.logger.log({
level: 'info',
message: 'Web socket disconnected'
@@ -27,7 +26,15 @@ export default class Connector {
throw new Error('The method subscribe has to be implemented');
};
message = (message) => this.messageCallback(message);
message = (message) => {
if (this.messageCallback)
this.messageCallback(message);
};
connected = (message) => {
if (this.connectCallback)
this.connectCallback(message);
};
transform = (message) => {
throw new Error('The method transform has to be implemented');
@@ -41,7 +48,8 @@ export default class Connector {
this.messageCallback = callback;
};
onClose = (callback) => {
onError = (callback) => {
this.closeCallback = callback;
};
}

View File

@@ -1,32 +1,44 @@
import WebSocket from "ws";
import Connector from "./connector";
export default class ConnectorRIS extends Connector{
constructor(params, env) {
super(params, env);
this.ws = null;
}
connect = () => {
connect = () =>
new Promise((resolve, reject) => {
try {
// const ws = new WebSocket(this.params.url);
//
// ws.on('message', this.message);
//
// ws.on('open', () => {
//
// });
//
// ws.on('close', this.close);
this.ws = new WebSocket(this.params.url);
this.ws.on('message', this.message);
this.ws.on('close', this.error);
this.ws.on('open', () => {
resolve(true);
this.connected();
});
new Promise((resolve, reject) => reject(new Error('The method connect has to be implemented')));
};
} catch(error) {
resolve(false);
}
});
subscribe = (input) =>
new Promise((resolve, reject) => {
try {
this.ws.send(JSON.stringify({
type: "ris_subscribe",
data: this.params
}));
resolve(true);
} catch(error) {
resolve(false);
}
});
subscribe = (input) => {
ws.send(JSON.stringify({
type: "ris_subscribe",
data: this.params
}));
};
transform = (message) => {
message = message.data;

View File

@@ -0,0 +1,67 @@
import WebSocket from "ws";
export default class ConnectorRIS extends Connector{
constructor(params, env) {
super(params, env);
}
connect = () => {
// const ws = new WebSocket(this.params.url);
//
// ws.on('message', this.message);
//
// ws.on('open', () => {
//
// });
//
// ws.on('close', this.close);
new Promise((resolve, reject) => reject(new Error('The method connect has to be implemented')));
};
subscribe = (input) => {
ws.send(JSON.stringify({
type: "ris_subscribe",
data: this.params
}));
};
transform = (message) => {
message = message.data;
const components = [];
const announcements = message["announcements"] || [];
const withdrawals = message["withdrawals"] || [];
const peer = message["peer"];
const path = message["path"];
for (let announcement of announcements){
const nextHop = announcement["next_hop"];
const prefixes = announcement["prefixes"] || [];
for (let prefix of prefixes){
components.push({
type: "announcement",
prefix,
peer,
path,
originAs: path[path.length - 1],
nextHop
})
}
}
for (let prefix of withdrawals){
components.push({
type: "withdrawal",
prefix,
peer
})
}
return components;
};
}

View File

@@ -21,6 +21,8 @@ export default class Consumer {
};
handleUpdate = (data) => {
// console.log(data);
return;
const messages = this.transform(data);
for (let monitor of monitors) {
@@ -73,7 +75,7 @@ export default class Consumer {
}
return components;
}
};
}

12
env.js
View File

@@ -53,7 +53,10 @@ const logger = winston.createLogger({
level: 'info',
transports: [
transportError,
transportReports
transportReports,
new winston.transports.Console({
format: winston.format.simple()
})
]
});
@@ -85,7 +88,8 @@ config.connectors = (config.connectors || [])
return {
class: require("./connectors/" + item.file).default,
params: item.params
params: item.params,
name: item.name
};
});
@@ -96,8 +100,8 @@ vector.config = config;
vector.logger = logger;
vector.input = input;
vector.pubSub = pubSub;
vector.monitors = config.monitors.map(monitor => new monitor.class(monitor.name, monitor.channel, vector));
vector.reports = config.reports.map(report => new report.class(report.channels, vector));
// vector.monitors = config.monitors.map(monitor => new monitor.class(monitor.name, monitor.channel, vector));
// vector.reports = config.reports.map(report => new report.class(report.channels, vector));
module.exports = vector;

View File

@@ -1,65 +1,42 @@
import { config, logger } from "./env";
import { config, logger, input } from "./env";
import cluster from "cluster";
import WebSocket from "ws";
import sleep from "sleep";
import Consumer from "./consumer";
import Connector from "./connector";
import ConnectorFactory from "./connectorFactory";
if (cluster.isMaster) {
const worker = cluster.fork();
if (config.testMode){
// const update = {
// data: {
// withdrawals: ["124.40.52.0/22"],
// peer: "124.0.0.2"
// },
// type: "ris_message"
// };
const connectorFactory = new ConnectorFactory();
const update = {
data: {
announcements: [{
prefixes: ["124.40.52.0/22"],
next_hop: "124.0.0.2"
}],
peer: "124.0.0.2",
path: "1,2,3,2914".split(",")
},
type: "ris_message"
};
connectorFactory.loadConnectors();
connectorFactory.connectConnectors()
.then(() => connectorFactory.subscribeConnectors(input))
.then(() => {
const message = JSON.stringify(update);
for (const connector of connectorFactory.getConnectors()) {
connector.onMessage((message) => {
worker.send(message);
});
connector.onError(error => {
logger.log({
level: 'error',
message: error
});
});
while (true){
worker.send(message);
sleep.sleep(1);
}
} else {
const ws = new WebSocket(config.websocketDataService);
ws.on('message', (message) => {
worker.send(message);
});
ws.on('open', () => {
ws.send(JSON.stringify({
type: "ris_subscribe",
data: config.wsParams
}));
});
ws.on('close', function close() {
}
})
.catch(error => {
logger.log({
level: 'info',
message: 'Web socket disconnected'
level: 'error',
message: error
});
});
}
} else {
new Consumer();