import time
import os
import importlib
import re
import copy
import yaml
from twisted.application import service
from twisted.internet import task, reactor, defer
from twisted.python import log
from tensor.protocol import riemann
[docs]class TensorService(service.Service):
""" Tensor service
Runs timers, configures sources and and manages the queue
"""
def __init__(self, config):
self.running = 0
self.sources = []
self.lastEvents = {}
self.outputs = {}
self.evCache = {}
self.critical = {}
self.warn = {}
self.eventCounter = 0
self.factory = None
self.protocol = None
self.watchdog = None
self.config = config
both = lambda i1, i2, t: isinstance(i1, t) and isinstance(i2, t)
if 'include_path' in config:
ipath = config['include_path']
if os.path.exists(ipath):
files = [
os.path.join(ipath, f) for f in os.listdir(ipath)
if f[-4:] == '.yml'
]
for f in files:
conf = yaml.load(open(f, 'rt'))
for k,v in conf.items():
if k in self.config:
if both(v, self.config[k], dict):
# Merge dicts
for k2, v2 in v.items():
self.config[k][j2] = v2
elif both(v, self.config[k], list):
# Extend lists
self.config[k].extend(v)
else:
# Overwrite
self.config[k] = v
else:
self.config[k] = v
log.msg('Loadded additional configuration from %s' % f)
else:
log.msg('Config Error: include_path %s does not exist' % ipath)
# Read some config stuff
self.debug = float(self.config.get('debug', False))
self.ttl = float(self.config.get('ttl', 60.0))
self.stagger = float(self.config.get('stagger', 0.2))
# Backward compatibility
self.server = self.config.get('server', 'localhost')
self.port = int(self.config.get('port', 5555))
self.proto = self.config.get('proto', 'tcp')
self.inter = self.config.get('interval', 60.0)
if self.debug:
print "config:", repr(config)
self.setupSources(self.config)
[docs] def setupOutputs(self, config):
"""Setup output processors"""
if self.proto == 'tcp':
defaultOutput = {
'output': 'tensor.outputs.riemann.RiemannTCP',
'server': self.server,
'port': self.port
}
else:
defaultOutput = {
'output': 'tensor.outputs.riemann.RiemannUDP',
'server': self.server,
'port': self.port
}
outputs = config.get('outputs', [defaultOutput])
for output in outputs:
if not ('debug' in output):
output['debug'] = self.debug
cl = output['output'].split('.')[-1] # class
path = '.'.join(output['output'].split('.')[:-1]) # import path
# Import the module and construct the output object
outputObj = getattr(
importlib.import_module(path), cl)(output, self)
name = output.get('name', None)
# Add the output to our routing hash
if name in self.outputs:
self.outputs[name].append(outputObj)
else:
self.outputs[name] = [outputObj]
# connect the output
reactor.callLater(0, outputObj.createClient)
def createSource(self, source):
# Resolve the source
cl = source['source'].split('.')[-1] # class
path = '.'.join(source['source'].split('.')[:-1]) # import path
# Import the module and get the object source we care about
sourceObj = getattr(importlib.import_module(path), cl)
if not ('debug' in source):
source['debug'] = self.debug
if not ('ttl' in source.keys()):
source['ttl'] = self.ttl
if not ('interval' in source.keys()):
source['interval'] = self.inter
return sourceObj(source, self.sendEvent, self)
def setupTriggers(self, source, sobj):
if source.get('critical'):
self.critical[sobj] = [
(re.compile(k), v) for k, v in source['critical'].items()
]
if source.get('warning'):
self.warn[sobj] = [
(re.compile(k), v) for k, v in source['warning'].items()
]
[docs] def setupSources(self, config):
"""Sets up source objects from the given config"""
sources = config.get('sources', [])
for source in sources:
src = self.createSource(source)
self.setupTriggers(source, src)
self.sources.append(src)
def _aggregateQueue(self, events):
# Handle aggregation for each event
queue = []
for ev in events:
if ev.aggregation:
id = ev.id()
thisM = ev.metric
if id in self.evCache:
lastM, lastTime = self.evCache[id]
tDelta = ev.time - lastTime
m = ev.aggregation(
lastM, ev.metric, tDelta)
ev.metric = m
queue.append(ev)
self.evCache[id] = (thisM, ev.time)
else:
queue.append(ev)
return queue
def setStates(self, source, queue):
for ev in queue:
if ev.state == 'ok':
for k, v in self.warn.get(source, []):
if k.match(ev.service):
s = eval("service %s" % v, {'service': ev.metric})
if s:
ev.state = 'warning'
for k, v in self.critical.get(source, []):
if k.match(ev.service):
s = eval("service %s" % v, {'service': ev.metric})
if s:
ev.state = 'critical'
def routeEvent(self, source, events):
routes = source.config.get('route', None)
if not isinstance(routes, list):
routes = [routes]
for route in routes:
if self.debug:
log.msg("Sending events %s to %s" % (events, route))
if not route in self.outputs:
# Non existant route
log.msg('Could not route %s -> %s.' % (
source.config['service'], route))
else:
for output in self.outputs[route]:
reactor.callLater(0, output.eventsReceived, events)
[docs] def sendEvent(self, source, events):
"""Callback that all event sources call when they have a new event
or list of events
"""
if isinstance(events, list):
self.eventCounter += len(events)
else:
self.eventCounter += 1
events = [events]
queue = self._aggregateQueue(events)
if queue:
if (source in self.critical) or (source in self.warn):
self.setStates(source, queue)
self.routeEvent(source, queue)
self.lastEvents[source] = time.time()
def _startSource(self, source):
source.startTimer()
@defer.inlineCallbacks
def startService(self):
yield self.setupOutputs(self.config)
if self.debug:
log.msg("Starting service")
stagger = 0
# Start sources internal timers
for source in self.sources:
if self.debug:
log.msg("Starting source " + source.config['service'])
# Stagger source timers, or use per-source start_delay
start_delay = float(source.config.get('start_delay', stagger))
reactor.callLater(start_delay, self._startSource, source)
stagger += self.stagger
reactor.callLater(stagger, self.startWatchdog)
self.running = 1
def startWatchdog(self):
# Start source watchdog
self.watchdog = task.LoopingCall(self.sourceWatchdog)
self.watchdog.start(10)
[docs] def sourceWatchdog(self):
"""Watchdog timer function.
Recreates sources which have not generated events in 10*interval if
they have watchdog set to true in their configuration
"""
for i, source in enumerate(self.sources):
if not source.config.get('watchdog', False):
continue
sn = repr(source)
last = self.lastEvents.get(source, None)
if last:
try:
if last < (time.time()-(source.inter*10)):
log.msg("Trying to restart stale source %s: %ss" % (
sn, int(time.time() - last)
))
s = self.sources.pop(i)
try:
s.t.stop()
except Exception, e:
log.msg("Could not stop timer for %s: %s" % (
sn, e))
config = copy.deepcopy(s.config)
del self.lastEvents[source]
del s, source
source = self.createSource(config)
reactor.callLater(0, self._startSource, source)
except Exception, e:
log.msg("Could not reset source %s: %s" % (
sn, e))
@defer.inlineCallbacks
def stopService(self):
self.running = 0
if self.watchdog:
self.watchdog.stop()
for n, outputs in self.outputs.items():
for output in outputs:
yield defer.maybeDeferred(output.stop)