Source code for tensor.outputs.riemann

import time

from twisted.internet import reactor, defer, task
from tensor.protocol import riemann

from tensor.objects import Output


[docs]class RiemannTCP(Output): """Riemann TCP output **Configuration arguments:** :param server: Riemann server hostname (default: localhost) :type server: str. :param port: Riemann server port (default: 5555) :type port: int. :param maxrate: Maximum de-queue rate (0 is no limit) :type maxrate: int. :param interval: De-queue interval in seconds (default: 1.0) :type interval: float. :param pressure: Maximum backpressure (-1 is no limit) :type pressure: int. """ def __init__(self, *a): Output.__init__(self, *a) self.events = [] self.t = task.LoopingCall(self.tick) self.inter = float(self.config.get('interval', 1.0)) # tick interval self.pressure = int(self.config.get('pressure', -1)) maxrate = int(self.config.get('maxrate', 0)) if maxrate > 0: self.queueDepth = int(maxrate * self.inter) else: self.queueDepth = None
[docs] def createClient(self): """Create a TCP connection to Riemann with automatic reconnection """ self.factory = riemann.RiemannClientFactory() server = self.config.get('server', 'localhost') port = self.config.get('port', 5555) self.connector = reactor.connectTCP(server, port, self.factory) d = defer.Deferred() def cb(): # Wait until we have a useful proto object if hasattr(self.factory, 'proto') and self.factory.proto: self.t.start(self.inter) d.callback(None) else: reactor.callLater(0.01, cb) cb() return d
[docs] def stop(self): """Stop this client. """ self.t.stop() self.factory.stopTrying() self.connector.disconnect()
[docs] def tick(self): """Clock tick called every self.inter """ if self.factory.proto: # Check backpressure if (self.pressure < 0) or (self.factory.proto.pressure <= self.pressure): self.emptyQueue() else: # Check queue age and expire stale events for i, e in enumerate(self.events): if (time.time() - e.time) > e.ttl: self.events.pop(i)
[docs] def emptyQueue(self): """Remove all or self.queueDepth events from the queue """ if self.events: if self.queueDepth: # Remove maximum of self.queueDepth items from queue events = self.events[:self.queueDepth] self.events = self.events[self.queueDepth:] else: events = self.events self.events = [] self.factory.proto.sendEvents(events)
[docs] def eventsReceived(self, events): """Receives a list of events and transmits them to Riemann Arguments: events -- list of `tensor.objects.Event` """ self.events.extend(events)
[docs]class RiemannUDP(Output): """Riemann UDP output (spray-and-pray mode) **Configuration arguments:** :param server: Riemann server IP address (default: 127.0.0.1) :type server: str. :param port: Riemann server port (default: 5555) :type port: int. """ def __init__(self, *a): Output.__init__(self, *a) self.protocol = None
[docs] def createClient(self): """Create a UDP connection to Riemann""" server = self.config.get('server', '127.0.0.1') port = self.config.get('port', 5555) def connect(ip): self.protocol = riemann.RiemannUDP(ip, port) self.endpoint = reactor.listenUDP(0, self.protocol) d = reactor.resolve(server) d.addCallback(connect) return d
[docs] def eventsReceived(self, events): """Receives a list of events and transmits them to Riemann Arguments: events -- list of `tensor.objects.Event` """ if self.protocol: self.protocol.sendEvents(events)