Source code for tensor.objects

import hashlib
import re
import time
import socket
import exceptions

from twisted.internet import task, defer
from twisted.python import log

[docs]class Event(object): """Tensor Event object All sources pass these to the queue, which form a proxy object to create protobuf Event objects :param state: Some sort of string < 255 chars describing the state :param service: The service name for this event :param description: A description for the event, ie. "My house is on fire!" :param metric: int or float metric for this event :param ttl: TTL (time-to-live) for this event :param tags: List of tag strings :param hostname: Hostname for the event (defaults to system fqdn) :param aggregation: Aggregation function :param attributes: A dictionary of key/value attributes for this event :param evtime: Event timestamp override """ def __init__( self, state, service, description, metric, ttl, tags=None, hostname=None, aggregation=None, evtime=None, attributes=None, type='riemann'): self.state = state self.service = service self.description = description self.metric = metric self.ttl = ttl self.tags = tags if tags is not None else [] self.attributes = attributes self.aggregation = aggregation self._type = type if evtime: self.time = evtime else: self.time = time.time() if hostname: self.hostname = hostname else: self.hostname = socket.gethostbyaddr(socket.gethostname())[0] def id(self): return self.hostname + '.' + self.service def __repr__(self): ser = ['%s=%s' % (k, repr(v)) for k,v in { 'hostname': self.hostname, 'state': self.state, 'service': self.service, 'metric': self.metric, 'ttl': self.ttl, 'tags': self.tags, 'aggregation': self.aggregation }.items()] return "<Event %s>" % (','.join(ser)) def copyWithMetric(self, m): return Event( self.state, self.service, self.description, m, self.ttl, self.tags, self.hostname, self.aggregation )
[docs]class Output(object): """Output parent class Outputs can inherit this object which provides a construct for a working output :param config: Dictionary config for this queue (usually read from the yaml configuration) :param tensor: A TensorService object for interacting with the queue manager """ def __init__(self, config, tensor): self.config = config self.tensor = tensor
[docs] def createClient(self): """Deferred which sets up the output """ pass
[docs] def eventsReceived(self): """Receives a list of events and processes them Arguments: events -- list of `tensor.objects.Event` """ pass
[docs] def stop(self): """Called when the service shuts down """ pass
[docs]class Source(object): """Source parent class Sources can inherit this object which provides a number of utility methods. :param config: Dictionary config for this queue (usually read from the yaml configuration) :param queueBack: A callback method to recieve a list of Event objects :param tensor: A TensorService object for interacting with the queue manager """ sync = False def __init__(self, config, queueBack, tensor): self.config = config self.t = task.LoopingCall(self.tick) = None self.attributes = None self.service = config['service'] self.inter = float(config['interval']) self.ttl = float(config['ttl']) if 'tags' in config: self.tags = [tag.strip() for tag in config['tags'].split(',')] else: self.tags = [] attributes = config.get("attributes") if isinstance(attributes, dict): self.attributes = attributes self.hostname = config.get('hostname') if self.hostname is None: self.hostname = socket.gethostbyaddr(socket.gethostname())[0] self.tensor = tensor self.queueBack = self._queueBack(queueBack) self.running = False def _queueBack(self, caller): return lambda events: caller(self, events)
[docs] def startTimer(self): """Starts the timer for this source""" = self.t.start(self.inter)
[docs] def stopTimer(self): """Stops the timer for this source""" = None self.t.stop()
@defer.inlineCallbacks def _get(self): event = yield defer.maybeDeferred(self.get) if self.config.get('debug', False): log.msg("[%s] Tick: %s" % (self.config['service'], event)) defer.returnValue(event) @defer.inlineCallbacks
[docs] def tick(self): """Called for every timer tick. Calls self.get which can be a deferred and passes that result back to the queueBack method Returns a deferred""" if self.sync: if self.running: defer.returnValue(None) self.running = True try: event = yield self._get() if event: self.queueBack(event) except Exception, e: log.msg("[%s] Unhandled error: %s" % (self.service, e)) self.running = False
[docs] def createEvent(self, state, description, metric, prefix=None, hostname=None, aggregation=None, evtime=None): """Creates an Event object from the Source configuration""" if prefix: service_name = self.service + "." + prefix else: service_name = self.service return Event(state, service_name, description, metric, self.ttl, hostname=hostname or self.hostname, aggregation=aggregation, evtime=evtime, tags=self.tags, attributes=self.attributes )
[docs] def createLog(self, type, data, evtime=None, hostname=None): """Creates an Event object from the Source configuration""" return Event(None, type, data, 0, self.ttl, hostname=hostname or self.hostname, evtime=evtime, tags=self.tags, type='log' )
def get(self): raise exceptions.NotImplementedError()