Source code for libnrdpd.schedule
# Copyright 2020 Hoplite Industries, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Core scheduling and execution."""
import json
import logging
import time
import traceback
# Local imports
from . import config
from . import nrdp
from . import task as tasklib
logging.getLogger(__name__).addHandler(logging.NullHandler())
[docs]class Schedule:
"""Handle scheduling of checks
Parameters:
cfg (:class:`libnrdpd.config.Config`): Config object
Raises:
ValueError: Raised when incoming ``cfg`` is of the wrong type.
"""
def __init__(self, cfg: config.Config):
if not isinstance(cfg, config.Config):
raise ValueError(
"cfg is `%s` expected config.Config" % (type(cfg))
)
self._cfg = cfg
self._tasks = {}
self._running = {}
self._queue = []
for check in self._cfg.checks.values():
self._queue.append(tasklib.Task(check))
[docs] def sort(self):
"""Re-sort the queue for processing"""
self._queue = sorted(self._queue, key=lambda x: x.start)
[docs] def loop(self):
"""Main engine for the nrdpd daemon.
Run in a loop forever executing checks and submitting them.
"""
log = logging.getLogger("%s.Schedule.loop" % __name__)
log.debug("Start main loop")
self.sort()
host_last_sent = time.time() - 3600
while True:
# Only re-order queue on changes in status
changed = False
# Check the status of running children
# Can't iterate over a dictionary that I'm modifying within
# the iteration.
for name in list(self._running.keys()): # pylint: disable=C0201
task = self._running[name]
if task.complete:
event = {
"check": task.check.name,
"started": task.began,
"elapsed": task.elapsed,
"status": task.status,
"timeout": task.expired,
}
# Attempt to send host checks every minute or so
send_host = False
now = time.time()
if (now - host_last_sent) >= 58.0:
send_host = True
host_last_sent = now
log.error("Task complete: %s", json.dumps(event))
try:
nrdp.submit(self._cfg, task, send_host=send_host)
except Exception as err: # pylint: disable=W0703
lines = traceback.format_exc().splitlines()
for line in lines:
log.error(line)
log.error("Submission error: %s", err)
task.reset()
del self._running[name]
changed = True
if changed:
self.sort()
now = time.time()
while self._queue and self._queue[0].start < now:
task = self._queue[0]
log.info("Starting check: %s", task.check.name)
self._running[task.check.name] = task
host = task.check.host if task.check.host else self._cfg.host
template = {"host": host, "fqdn": self._cfg.fqdn}
log.debug("Template variables: %s", repr(template))
task.run(**template)
self.sort()
# Delay until the next scheduled check if nothing is running
# If stuff is running the 100% CPU usage delay is built into
# checking the running processes and thus no delay is needed
# here.
if self._queue and not self._running:
now = time.time()
sleepytime = self._queue[0].start - now
if sleepytime > 0:
log.debug(
"sleeping %0.2f till next scheduled job", sleepytime
)
time.sleep(sleepytime)