Source code for tensor.outputs.elasticsearch

import time
import json

from twisted.internet import reactor, defer, task
from twisted.python import log

try:
    from OpenSSL import SSL
    from twisted.internet import ssl
except:
    SSL=None

from tensor.protocol import elasticsearch

from tensor.objects import Output

[docs]class ElasticSearchLog(Output): """ElasticSearch HTTP API output This Output transposes events to a Logstash format **Configuration arguments:** :param url: Elasticsearch URL (default: http://localhost:9200) :type url: str :param maxsize: Maximum queue backlog size (default: 250000, 0 disables) :type maxsize: int :param maxrate: Maximum rate of documents added to index (default: 100) :type maxrate: int :param interval: Queue check interval in seconds (default: 1.0) :type interval: int :param user: Optional basic auth username :type user: str :param password: Optional basic auth password :type password: str """ def __init__(self, *a): Output.__init__(self, *a) self.events = [] self.t = task.LoopingCall(self.tick) self.inter = float(self.config.get('interval', 1.0)) # tick interval self.maxsize = int(self.config.get('maxsize', 250000)) self.user = self.config.get('user') self.password = self.config.get('password') self.url = self.config.get('url', 'http://localhost:9200') maxrate = int(self.config.get('maxrate', 100)) if maxrate > 0: self.queueDepth = int(maxrate * self.inter) else: self.queueDepth = None
[docs] def createClient(self): """Sets up HTTP connector and starts queue timer """ server = self.config.get('server', 'localhost') port = int(self.config.get('port', 9200)) self.client = elasticsearch.ElasticSearch(self.url, self.user, self.password) self.t.start(self.inter)
[docs] def stop(self): """Stop this client. """ self.t.stop()
def transposeEvent(self, event): d = event.description d['type'] = event.service d['host'] = event.hostname d['tags'] = event.tags if event._type=='log': return d return None def sendEvents(self, events): return self.client.bulkIndex( [self.transposeEvent(e) for e in events]) @defer.inlineCallbacks
[docs] def tick(self): """Clock tick called every self.inter """ if self.events: if self.queueDepth and (len(self.events) > self.queueDepth): # Remove maximum of self.queueDepth items from queue events = self.events[:self.queueDepth] self.events = self.events[self.queueDepth:] else: events = self.events self.events = [] try: result = yield self.sendEvents(events) if result.get('errors', False): log.msg(repr(result)) self.events.extend(events) except Exception, e: log.msg('Could not connect to elasticsearch ' + str(e)) self.events.extend(events)
[docs] def eventsReceived(self, events): """Receives a list of events and queues them Arguments: events -- list of `tensor.objects.Event` """ # Make sure queue isn't oversized if (self.maxsize < 1) or (len(self.events) < self.maxsize): self.events.extend(events)