import time
import sys
import os
import importlib
import re
import copy
import yaml
from twisted.application import service
from twisted.internet import task, reactor, defer
from twisted.python import log
from tensor.protocol import riemann
[docs]class TensorService(service.Service):
""" Tensor service
Runs timers, configures sources and and manages the queue
"""
def __init__(self, config):
self.running = 0
self.sources = []
self.lastEvents = {}
self.outputs = {}
self.evCache = {}
self.critical = {}
self.warn = {}
self.eventCounter = 0
self.factory = None
self.protocol = None
self.watchdog = None
self.config = config
both = lambda i1, i2, t: isinstance(i1, t) and isinstance(i2, t)
if os.path.exists('/var/lib/tensor'):
sys.path.append('/var/lib/tensor')
if 'include_path' in config:
ipath = config['include_path']
if os.path.exists(ipath):
files = [
os.path.join(ipath, f) for f in os.listdir(ipath)
if f[-4:] == '.yml'
]
for f in files:
conf = yaml.load(open(f, 'rt'))
for k,v in conf.items():
if k in self.config:
if both(v, self.config[k], dict):
# Merge dicts
for k2, v2 in v.items():
self.config[k][j2] = v2
elif both(v, self.config[k], list):
# Extend lists
self.config[k].extend(v)
else:
# Overwrite
self.config[k] = v
else:
self.config[k] = v
log.msg('Loadded additional configuration from %s' % f)
else:
log.msg('Config Error: include_path %s does not exist' % ipath)
# Read some config stuff
self.debug = float(self.config.get('debug', False))
self.ttl = float(self.config.get('ttl', 60.0))
self.stagger = float(self.config.get('stagger', 0.2))
# Backward compatibility
self.server = self.config.get('server', 'localhost')
self.port = int(self.config.get('port', 5555))
self.proto = self.config.get('proto', 'tcp')
self.inter = self.config.get('interval', 60.0)
if self.debug:
print "config:", repr(config)
self.setupSources(self.config)
[docs] def setupOutputs(self, config):
"""Setup output processors"""
if self.proto == 'tcp':
defaultOutput = {
'output': 'tensor.outputs.riemann.RiemannTCP',
'server': self.server,
'port': self.port
}
else:
defaultOutput = {
'output': 'tensor.outputs.riemann.RiemannUDP',
'server': self.server,
'port': self.port
}
outputs = config.get('outputs', [defaultOutput])
for output in outputs:
if not ('debug' in output):
output['debug'] = self.debug
cl = output['output'].split('.')[-1] # class
path = '.'.join(output['output'].split('.')[:-1]) # import path
# Import the module and construct the output object
outputObj = getattr(
importlib.import_module(path), cl)(output, self)
name = output.get('name', None)
# Add the output to our routing hash
if name in self.outputs:
self.outputs[name].append(outputObj)
else:
self.outputs[name] = [outputObj]
# connect the output
reactor.callLater(0, outputObj.createClient)
def createSource(self, source):
#
if source.get('path'):
path = source['path']
if path not in sys.path:
sys.path.append(path)
# Resolve the source
cl = source['source'].split('.')[-1] # class
path = '.'.join(source['source'].split('.')[:-1]) # import path
# Import the module and get the object source we care about
sourceObj = getattr(importlib.import_module(path), cl)
if not ('debug' in source):
source['debug'] = self.debug
if not ('ttl' in source.keys()):
source['ttl'] = self.ttl
if not ('interval' in source.keys()):
source['interval'] = self.inter
return sourceObj(source, self.sendEvent, self)
def setupTriggers(self, source, sobj):
if source.get('critical'):
self.critical[sobj] = [
(re.compile(k), v) for k, v in source['critical'].items()
]
if source.get('warning'):
self.warn[sobj] = [
(re.compile(k), v) for k, v in source['warning'].items()
]
[docs] def setupSources(self, config):
"""Sets up source objects from the given config"""
sources = config.get('sources', [])
for source in sources:
src = self.createSource(source)
self.setupTriggers(source, src)
self.sources.append(src)
def _aggregateQueue(self, events):
# Handle aggregation for each event
queue = []
for ev in events:
if ev.aggregation:
id = ev.id()
thisM = ev.metric
if id in self.evCache:
lastM, lastTime = self.evCache[id]
tDelta = ev.time - lastTime
m = ev.aggregation(
lastM, ev.metric, tDelta)
if m:
ev.metric = m
queue.append(ev)
self.evCache[id] = (thisM, ev.time)
else:
queue.append(ev)
return queue
def setStates(self, source, queue):
for ev in queue:
if ev.state == 'ok':
for k, v in self.warn.get(source, []):
if k.match(ev.service):
s = eval("service %s" % v, {'service': ev.metric})
if s:
ev.state = 'warning'
for k, v in self.critical.get(source, []):
if k.match(ev.service):
s = eval("service %s" % v, {'service': ev.metric})
if s:
ev.state = 'critical'
def routeEvent(self, source, events):
routes = source.config.get('route', None)
if not isinstance(routes, list):
routes = [routes]
for route in routes:
if self.debug:
log.msg("Sending events %s to %s" % (events, route))
if not route in self.outputs:
# Non existant route
log.msg('Could not route %s -> %s.' % (
source.config['service'], route))
else:
for output in self.outputs[route]:
reactor.callLater(0, output.eventsReceived, events)
[docs] def sendEvent(self, source, events):
"""Callback that all event sources call when they have a new event
or list of events
"""
if isinstance(events, list):
self.eventCounter += len(events)
else:
self.eventCounter += 1
events = [events]
queue = self._aggregateQueue(events)
if queue:
if (source in self.critical) or (source in self.warn):
self.setStates(source, queue)
self.routeEvent(source, queue)
queue = []
self.lastEvents[source] = time.time()
def _startSource(self, source):
source.startTimer()
@defer.inlineCallbacks
def startService(self):
yield self.setupOutputs(self.config)
if self.debug:
log.msg("Starting service")
stagger = 0
# Start sources internal timers
for source in self.sources:
if self.debug:
log.msg("Starting source " + source.config['service'])
# Stagger source timers, or use per-source start_delay
start_delay = float(source.config.get('start_delay', stagger))
reactor.callLater(start_delay, self._startSource, source)
stagger += self.stagger
reactor.callLater(stagger, self.startWatchdog)
self.running = 1
def startWatchdog(self):
# Start source watchdog
self.watchdog = task.LoopingCall(self.sourceWatchdog)
self.watchdog.start(10)
[docs] def sourceWatchdog(self):
"""Watchdog timer function.
Recreates sources which have not generated events in 10*interval if
they have watchdog set to true in their configuration
"""
for i, source in enumerate(self.sources):
if not source.config.get('watchdog', False):
continue
sn = repr(source)
last = self.lastEvents.get(source, None)
if last:
try:
if last < (time.time()-(source.inter*10)):
log.msg("Trying to restart stale source %s: %ss" % (
sn, int(time.time() - last)
))
s = self.sources.pop(i)
try:
s.t.stop()
except Exception, e:
log.msg("Could not stop timer for %s: %s" % (
sn, e))
config = copy.deepcopy(s.config)
del self.lastEvents[source]
del s, source
source = self.createSource(config)
reactor.callLater(0, self._startSource, source)
except Exception, e:
log.msg("Could not reset source %s: %s" % (
sn, e))
@defer.inlineCallbacks
def stopService(self):
self.running = 0
if self.watchdog:
self.watchdog.stop()
for n, outputs in self.outputs.items():
for output in outputs:
yield defer.maybeDeferred(output.stop)