import time
import random
from twisted.internet import reactor, defer, task
from twisted.python import log
try:
from OpenSSL import SSL
from twisted.internet import ssl
except:
SSL=None
from tensor.protocol import riemann
from tensor.objects import Output
if SSL:
class ClientTLSContext(ssl.ClientContextFactory):
def __init__(self, key, cert):
self.key = key
self.cert = cert
def getContext(self):
self.method = SSL.TLSv1_METHOD
ctx = ssl.ClientContextFactory.getContext(self)
ctx.use_certificate_file(self.cert)
ctx.use_privatekey_file(self.key)
return ctx
[docs]class RiemannTCP(Output):
"""Riemann TCP output
**Configuration arguments:**
:param server: Riemann server hostname (default: localhost)
:type server: str.
:param port: Riemann server port (default: 5555)
:type port: int.
:param failover: Enable server failover, in which case `server` may be a list
:type failover: bool.
:param maxrate: Maximum de-queue rate (0 is no limit)
:type maxrate: int.
:param maxsize: Maximum queue size (0 is no limit, default is 250000)
:type maxsize: int.
:param interval: De-queue interval in seconds (default: 1.0)
:type interval: float.
:param pressure: Maximum backpressure (-1 is no limit)
:type pressure: int.
:param tls: Use TLS (default false)
:type tls: bool.
:param cert: Host certificate path
:type cert: str.
:param key: Host private key path
:type key: str.
:param allow_nan: Send events with None metric value (default true)
:type allow_nan: bool
"""
def __init__(self, *a):
Output.__init__(self, *a)
self.events = []
self.t = task.LoopingCall(self.tick)
self.inter = float(self.config.get('interval', 1.0)) # tick interval
self.pressure = int(self.config.get('pressure', -1))
self.maxsize = int(self.config.get('maxsize', 250000))
self.expire = self.config.get('expire', False)
self.allow_nan = self.config.get('allow_nan', True)
maxrate = int(self.config.get('maxrate', 0))
if maxrate > 0:
self.queueDepth = int(maxrate * self.inter)
else:
self.queueDepth = None
self.tls = self.config.get('tls', False)
if self.tls:
self.cert = self.config['cert']
self.key = self.config['key']
[docs] def createClient(self):
"""Create a TCP connection to Riemann with automatic reconnection
"""
server = self.config.get('server', 'localhost')
port = self.config.get('port', 5555)
failover = self.config.get('failover', False)
self.factory = riemann.RiemannClientFactory(server, failover=failover)
if failover:
initial = random.choice(server)
else:
initial = server
log.msg('Connecting to Riemann on %s:%s' % (initial, port))
if self.tls:
if SSL:
self.connector = reactor.connectSSL(initial, port, self.factory,
ClientTLSContext(self.key, self.cert))
else:
log.msg('[FATAL] SSL support not available!' \
' Please install PyOpenSSL. Exiting now')
reactor.stop()
else:
self.connector = reactor.connectTCP(initial, port, self.factory)
d = defer.Deferred()
def cb():
# Wait until we have a useful proto object
if hasattr(self.factory, 'proto') and self.factory.proto:
self.t.start(self.inter)
d.callback(None)
else:
reactor.callLater(0.01, cb)
cb()
return d
[docs] def stop(self):
"""Stop this client.
"""
self.t.stop()
self.factory.stopTrying()
self.connector.disconnect()
[docs] def tick(self):
"""Clock tick called every self.inter
"""
if self.factory.proto:
# Check backpressure
if (self.pressure < 0) or (self.factory.proto.pressure <= self.pressure):
self.emptyQueue()
elif self.expire:
# Check queue age and expire stale events
for i, e in enumerate(self.events):
if (time.time() - e.time) > e.ttl:
self.events.pop(i)
[docs] def emptyQueue(self):
"""Remove all or self.queueDepth events from the queue
"""
if self.events:
if self.queueDepth and (len(self.events) > self.queueDepth):
# Remove maximum of self.queueDepth items from queue
events = self.events[:self.queueDepth]
self.events = self.events[self.queueDepth:]
else:
events = self.events
self.events = []
if self.allow_nan:
self.factory.proto.sendEvents(events)
else:
self.factory.proto.sendEvents([e for e in events if e.metric is not None])
[docs] def eventsReceived(self, events):
"""Receives a list of events and transmits them to Riemann
Arguments:
events -- list of `tensor.objects.Event`
"""
# Make sure queue isn't oversized
if (self.maxsize < 1) or (len(self.events) < self.maxsize):
self.events.extend(events)
[docs]class RiemannUDP(Output):
"""Riemann UDP output (spray-and-pray mode)
**Configuration arguments:**
:param server: Riemann server IP address (default: 127.0.0.1)
:type server: str.
:param port: Riemann server port (default: 5555)
:type port: int.
"""
def __init__(self, *a):
Output.__init__(self, *a)
self.protocol = None
[docs] def createClient(self):
"""Create a UDP connection to Riemann"""
server = self.config.get('server', '127.0.0.1')
port = self.config.get('port', 5555)
def connect(ip):
self.protocol = riemann.RiemannUDP(ip, port)
self.endpoint = reactor.listenUDP(0, self.protocol)
d = reactor.resolve(server)
d.addCallback(connect)
return d
[docs] def eventsReceived(self, events):
"""Receives a list of events and transmits them to Riemann
Arguments:
events -- list of `tensor.objects.Event`
"""
if self.protocol:
self.protocol.sendEvents(events)