2019-06-28 03:46:48 +02:00
|
|
|
import config from "./config";
|
2019-06-29 03:35:53 +02:00
|
|
|
import pubSub from 'pubsub-js';
|
2019-06-30 01:42:58 +02:00
|
|
|
import logger from './logger';
|
2019-06-28 03:46:48 +02:00
|
|
|
|
|
|
|
export default class Consumer {
|
2019-06-29 03:35:53 +02:00
|
|
|
|
2019-06-28 03:46:48 +02:00
|
|
|
constructor(inputManager){
|
|
|
|
process.on('message', this.dispatch);
|
|
|
|
this.monitors = config.monitors.map(monitor =>
|
2019-06-29 03:35:53 +02:00
|
|
|
new monitor.class(inputManager, monitor.name, monitor.channel, config, pubSub));
|
|
|
|
|
|
|
|
this.reports = config.reports.map(report =>
|
|
|
|
new report.class(report.channels, config, pubSub));
|
2019-06-30 01:42:58 +02:00
|
|
|
|
2019-06-28 03:46:48 +02:00
|
|
|
};
|
|
|
|
|
|
|
|
dispatch = (data) => {
|
|
|
|
try {
|
|
|
|
const message = JSON.parse(data);
|
|
|
|
switch (message.type) {
|
|
|
|
case "ris_message": this.handleUpdate(message)
|
|
|
|
}
|
2019-06-30 01:42:58 +02:00
|
|
|
} catch (error) {
|
|
|
|
// Don't do anything
|
2019-06-28 03:46:48 +02:00
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
handleUpdate = (data) => {
|
|
|
|
const messages = this.transform(data);
|
|
|
|
for (let monitor of this.monitors) {
|
|
|
|
|
|
|
|
// Blocking filtering to reduce stack usage
|
|
|
|
for (const message of messages.filter(monitor.filter)) {
|
|
|
|
|
|
|
|
// Promise call to reduce waiting times
|
|
|
|
monitor.monitor(message)
|
|
|
|
.catch(error => {
|
2019-06-30 01:42:58 +02:00
|
|
|
logger.log({
|
|
|
|
level: 'error',
|
|
|
|
message: error
|
|
|
|
});
|
2019-06-28 03:46:48 +02:00
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
transform = (message) => {
|
|
|
|
message = message.data;
|
|
|
|
const components = [];
|
|
|
|
const announcements = message["announcements"] || [];
|
|
|
|
const withdrawals = message["withdrawals"] || [];
|
|
|
|
const peer = message["peer"];
|
|
|
|
const path = message["path"];
|
|
|
|
|
|
|
|
for (let announcement of announcements){
|
|
|
|
const nextHop = announcement["next_hop"];
|
|
|
|
const prefixes = announcement["prefixes"] || [];
|
|
|
|
|
|
|
|
for (let prefix of prefixes){
|
|
|
|
components.push({
|
|
|
|
type: "announcement",
|
|
|
|
prefix,
|
|
|
|
peer,
|
|
|
|
path,
|
|
|
|
originAs: path[path.length - 1],
|
|
|
|
nextHop
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for (let prefix of withdrawals){
|
|
|
|
components.push({
|
|
|
|
type: "withdrawal",
|
|
|
|
prefix,
|
|
|
|
peer
|
|
|
|
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
return components;
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|