Add circular loop detection to MaxDepth (#15579)

* Add circular loop detection to MaxDepth

* Formatting fixes

* Remove controversial bit

* Remove the recursion on the observer code updating max depth of child devices

* Update the fast ping code to keep track of device dependencies instead of using max_depth

* Style fixes

* Add circular loop detection to MaxDepth

* Formatting fixes

* Remove controversial bit

* Update the fast ping code to keep track of device dependencies instead of using max_depth

* Style fixes

* Fix the device list

* Remove some more old lines from the ping job

* Filter parents to those that have ping enabled to ensure child devices are always trigered for alerts

* Formatting fixes

* Added code to the ping check to order the hostnames so we try to ping parent devices before children

* Formatting fixes

* Add some types

* Refine host ordering code

* Fix output and simplify lnms poller:ping command

* a bit more cleanup

* Formatting fixes

* Fixed up type for waiting on list

* Formatting fix

---------

Co-authored-by: Tony Murray <murraytony@gmail.com>
This commit is contained in:
eskyuu
2024-10-03 10:24:22 +08:00
committed by GitHub
parent 32f9effd8e
commit f649f25892
4 changed files with 159 additions and 151 deletions
+147 -129
View File
@@ -32,39 +32,39 @@ use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels; use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Collection; use Illuminate\Support\Collection;
use Illuminate\Support\Facades\Log;
use LibreNMS\Alert\AlertRules; use LibreNMS\Alert\AlertRules;
use LibreNMS\Data\Source\Fping; use LibreNMS\Data\Source\Fping;
use LibreNMS\Data\Source\FpingResponse; use LibreNMS\Data\Source\FpingResponse;
use LibreNMS\Util\Debug;
class PingCheck implements ShouldQueue class PingCheck implements ShouldQueue
{ {
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/** @var \Illuminate\Database\Eloquent\Collection<string, Device>|null List of devices keyed by hostname */ /** @var Collection<string, Device> List of devices keyed by hostname */
private $devices; private Collection $devices;
/** @var array List of device group ids to check */ /** @var array List of device group ids to check */
private $groups = []; private array $groups = [];
// working data for loop // working data for loop
/** @var Collection */ /** @var Collection */
private $tiered; private Collection $deferred;
/** @var Collection */ /** @var Collection<int, Collection<int, bool>> device id, parent devices */
private $current; private Collection $waiting_on;
private $current_tier; /** @var Collection<int, bool> */
/** @var Collection */ private Collection $processed;
private $deferred;
/** /**
* Create a new job instance. * Create a new job instance.
* *
* @param array $groups List of distributed poller groups to check * @param array $groups List of distributed poller groups to check
*/ */
public function __construct($groups = []) public function __construct(array $groups = [])
{ {
if (is_array($groups)) {
$this->groups = $groups; $this->groups = $groups;
} $this->deferred = new Collection;
$this->waiting_on = new Collection;
$this->processed = new Collection;
} }
/** /**
@@ -76,19 +76,20 @@ class PingCheck implements ShouldQueue
{ {
$ping_start = microtime(true); $ping_start = microtime(true);
$this->fetchDevices(); $ordered_hostname_list = $this->orderHostnames($this->fetchDevices());
$ordered_device_list = $this->tiered->get(1, collect())->keys()// root nodes before standalone nodes Log::info('Processing hosts in this order : ' . implode(', ', $ordered_hostname_list));
->merge($this->devices->keys())
->unique()->all();
// bulk ping and send FpingResponse's to recordData as they come in // bulk ping and send FpingResponse's to recordData as they come in
app()->make(Fping::class)->bulkPing($ordered_device_list, [$this, 'handleResponse']); app()->make(Fping::class)->bulkPing($ordered_hostname_list, [$this, 'handleResponse']);
// check for any left over devices // check for any left over devices
if ($this->deferred->isNotEmpty()) { if ($this->deferred->isNotEmpty()) {
d_echo("Leftover devices, this shouldn't happen: " . $this->deferred->flatten(1)->implode('hostname', ', ') . PHP_EOL); Log::debug("Leftover deferred devices, this shouldn't happen: " . $this->deferred->keys()->implode(', '));
d_echo('Devices left in tier: ' . collect($this->current)->implode('hostname', ', ') . PHP_EOL); }
if ($this->waiting_on->isNotEmpty()) {
Log::debug("Leftover waiting on devices, this shouldn't happen: " . $this->waiting_on->keys()->implode(', '));
} }
if (\App::runningInConsole()) { if (\App::runningInConsole()) {
@@ -96,15 +97,53 @@ class PingCheck implements ShouldQueue
} }
} }
private function fetchDevices() /**
* Get an ordered list of hostnames that we need to ping starting from devices with no parents
*/
private function orderHostnames(Collection $devices): array
{
$ordered_device_list = new Collection;
// start with root nodes (no parents)
[$current_tier_devices, $pending_children] = $devices->keyBy('device_id')->partition(fn (Device $d) => $d->parents_count === 0);
// recurse down until no children are found
while ($current_tier_devices->isNotEmpty()) {
// add current tier to the list
$ordered_device_list = $ordered_device_list->merge($current_tier_devices);
// fetch the next tier of devices
$current_tier_devices = $current_tier_devices
->pluck('children.*.device_id')->flatten() // get all direct child ids
->map(fn ($child_id) => $pending_children->pull($child_id)) // fetch and remove the device from pending if it exists
->filter(); // filter out children that are already in the list
}
// just add any left over
$ordered_device_list = $ordered_device_list->merge($pending_children);
return $ordered_device_list->map(fn (Device $device) => $device->overwrite_ip ?: $device->hostname)->all();
}
/**
* Fetch and cache all devices that we need to process
*/
private function fetchDevices(): Collection
{ {
if (isset($this->devices)) { if (isset($this->devices)) {
return $this->devices; return $this->devices;
} }
$query = Device::canPing() $query = Device::canPing()
->select(['devices.device_id', 'hostname', 'overwrite_ip', 'status', 'status_reason', 'last_ping', 'last_ping_timetaken', 'max_depth']) ->select(['devices.device_id', 'hostname', 'overwrite_ip', 'status', 'status_reason', 'last_ping', 'last_ping_timetaken'])
->orderBy('max_depth'); ->with([
'parents' => function ($q) {
$q->canPing()->select('devices.device_id');
},
'children' => function ($q) {
$q->canPing()->select('devices.device_id');
},
])
->withCount('parents');
if ($this->groups) { if ($this->groups) {
$query->whereIntegerInRaw('poller_group', $this->groups); $query->whereIntegerInRaw('poller_group', $this->groups);
@@ -114,73 +153,29 @@ class PingCheck implements ShouldQueue
return $device->overwrite_ip ?: $device->hostname; return $device->overwrite_ip ?: $device->hostname;
}); });
// working collections
$this->tiered = $this->devices->groupBy('max_depth', true);
$this->deferred = new Collection();
// start with tier 1 (the root nodes, 0 is standalone)
$this->current_tier = 1;
$this->current = $this->tiered->get($this->current_tier, collect());
if (Debug::isVerbose()) {
$this->tiered->each(function (Collection $tier, $index) {
echo "Tier $index (" . $tier->count() . '): ';
echo $tier->implode('hostname', ', ');
echo PHP_EOL;
});
}
return $this->devices; return $this->devices;
} }
/** /**
* Check if this tier is complete and move to the next tier * Record the data and run alerts if all parents have been processed
* If we moved to the next tier, check if we can report any of our deferred results
*/
private function processTier()
{
if ($this->current->isNotEmpty()) {
return;
}
$this->current_tier++; // next tier
if (! $this->tiered->has($this->current_tier)) {
// out of devices
return;
}
if (Debug::isVerbose()) {
echo "Out of devices at this tier, moving to tier $this->current_tier\n";
}
$this->current = $this->tiered->get($this->current_tier);
// update and remove devices in the current tier
foreach ($this->deferred->pull($this->current_tier, []) as $fpingResponse) {
$this->handleResponse($fpingResponse);
}
// try to process the new tier in case we took care of all the devices
$this->processTier();
}
/**
* If the device is on the current tier, record the data and remove it
* $data should have keys: hostname, status, and conditionally rtt
*/ */
public function handleResponse(FpingResponse $response): void public function handleResponse(FpingResponse $response): void
{ {
if (Debug::isVerbose()) { Log::debug("Attempting to record data for $response->host");
echo "Attempting to record data for $response->host... ";
}
$device = $this->devices->get($response->host); $device = $this->devices->get($response->host);
// process the data if this is a standalone device or in the current tier if ($device === null) {
if ($device->max_depth === 0 || $this->current->has($device->hostname)) { Log::error("Ping host from response not found $response->host");
if (Debug::isVerbose()) {
echo "Success\n"; return;
}
$waiting_on = [];
foreach ($device->parents ?? [] as $parent) {
if (! $this->processed->has($parent->device_id)) {
$waiting_on[] = $parent->device_id;
}
} }
// mark up only if snmp is not down too // mark up only if snmp is not down too
@@ -194,56 +189,79 @@ class PingCheck implements ShouldQueue
// save last_ping_timetaken and rrd data // save last_ping_timetaken and rrd data
$response->saveStats($device); $response->saveStats($device);
// mark as processed
$this->processed->put($device->device_id, true);
Log::debug("Recorded data for $device->hostname");
if (isset($type)) { // only run alert rules if status changed if (isset($type)) { // only run alert rules if status changed
echo "Device $device->hostname changed status to $type, running alerts\n"; Log::debug("Device $device->hostname changed status to $type, running alerts");
if (count($waiting_on) === 0) {
$this->runAlerts($device->device_id);
} else {
Log::debug('Alerts Deferred');
$this->deferred->put($device->device_id, $device->parents);
foreach ($waiting_on as $parent_id) {
Log::debug("Adding $device->device_id to list waiting for $parent_id");
if ($this->waiting_on->has($parent_id)) {
$child_list = $this->waiting_on->get($parent_id);
$child_list->put($device->device_id, true);
} else {
// create a new entry containing this device
$this->waiting_on->put($parent_id, collect([$device->device_id => true]));
}
}
}
}
$this->runDeferredAlerts($device->device_id);
}
/**
* Run any deferred alerts
*/
private function runDeferredAlerts(int $device_id): void
{
// check for any devices waiting on this device
if ($this->waiting_on->has($device_id)) {
$children = $this->waiting_on->get($device_id)->keys();
// Check each child to see if alerts have been deferred
foreach ($children as $child_id) {
if ($this->deferred->has($child_id)) {
// run alert if all parents have been processed
$alert_child = true;
$parents = $this->deferred->get($child_id);
foreach ($parents as $parent) {
if (! $this->processed->has($parent->device_id)) {
Log::debug("Deferring device $child_id triggered by $device_id still waiting for $parent->device_id");
$alert_child = false;
}
}
if ($alert_child) {
Log::debug("Deferred device $child_id triggered by $device_id");
$this->runAlerts($child_id);
$this->deferred->pull($child_id);
}
}
}
}
$this->waiting_on->pull($device_id);
}
/**
* run alerts for a device
*/
private function runAlerts(int $device_id): void
{
$rules = new AlertRules; $rules = new AlertRules;
$rules->runRules($device->device_id); $rules->runRules($device_id);
}
// done with this device
$this->complete($device->hostname);
d_echo("Recorded data for $device->hostname (tier $device->max_depth)\n");
} else {
if (Debug::isVerbose()) {
echo "Deferred\n";
}
$this->defer($response);
}
$this->processTier();
}
/**
* Done processing $hostname, remove it from our active data
*
* @param string $hostname
*/
private function complete($hostname)
{
$this->current->offsetUnset($hostname);
$this->deferred->each->offsetUnset($hostname);
}
/**
* Defer this data processing until all parent devices are complete
*/
private function defer(FpingResponse $response): void
{
$device = $this->devices->get($response->host);
if ($device == null) {
dd("could not find $response->host");
}
if ($this->deferred->has($device->max_depth)) {
// add this data to the proper tier, unless it already exists...
$tier = $this->deferred->get($device->max_depth);
if (! $tier->has($device->hostname)) {
$tier->put($device->hostname, $response);
}
} else {
// create a new tier containing this data
$this->deferred->put($device->max_depth, collect([$device->hostname => $response]));
}
} }
} }
-5
View File
@@ -33,11 +33,6 @@ class DeviceObserver
*/ */
public function updated(Device $device): void public function updated(Device $device): void
{ {
// handle device dependency updates
if ($device->isDirty('max_depth')) {
$device->children->each->updateMaxDepth();
}
// log up/down status changes // log up/down status changes
if ($device->isDirty(['status', 'status_reason'])) { if ($device->isDirty(['status', 'status_reason'])) {
$type = $device->status ? 'up' : 'down'; $type = $device->status ? 'up' : 'down';
+7 -1
View File
@@ -371,7 +371,13 @@ if ($options['f'] === 'recalculate_device_dependencies') {
// update all root nodes and recurse, chunk so we don't blow up // update all root nodes and recurse, chunk so we don't blow up
Device::doesntHave('parents')->with('children')->chunkById(100, function (Collection $devices) { Device::doesntHave('parents')->with('children')->chunkById(100, function (Collection $devices) {
// anonymous recursive function // anonymous recursive function
$recurse = function (Device $device) use (&$recurse) { $processed = [];
$recurse = function (Device $device) use (&$recurse, &$processed) {
// Do not process the same device 2 times
if (array_key_exists($device->device_id, $processed)) {
return;
}
$processed[$device->device_id] = true;
$device->updateMaxDepth(); $device->updateMaxDepth();
$device->children->each($recurse); $device->children->each($recurse);
+2 -13
View File
@@ -1,5 +1,6 @@
<?php <?php
use App\Jobs\PingCheck;
use Illuminate\Support\Facades\Artisan; use Illuminate\Support\Facades\Artisan;
use Symfony\Component\Process\Process; use Symfony\Component\Process\Process;
@@ -43,19 +44,7 @@ Artisan::command('update', function () {
Artisan::command('poller:ping Artisan::command('poller:ping
{groups?* : ' . __('Optional List of distributed poller groups to poll') . '} {groups?* : ' . __('Optional List of distributed poller groups to poll') . '}
', function () { ', function () {
// PingCheck::dispatch(new PingCheck($this->argument('groups'))); PingCheck::dispatch($this->argument('groups', []));
$command = [base_path('ping.php')];
if ($this->argument('groups')) {
$command[] = '-g';
$command[] = implode(',', $this->argument('groups'));
}
if (($verbosity = $this->getOutput()->getVerbosity()) >= 128) {
$command[] = '-d';
if ($verbosity >= 256) {
$command[] = '-v';
}
}
(new Process($command))->setTimeout(null)->setIdleTimeout(null)->setTty(true)->run();
})->purpose(__('Check if devices are up or down via icmp')); })->purpose(__('Check if devices are up or down via icmp'));
Artisan::command('poller:discovery Artisan::command('poller:discovery