Source code for tensor.sources.nginx

"""
.. module:: nginx
   :platform: Unix
   :synopsis: A source module for nginx stats

.. moduleauthor:: Colin Alston <colin@imcol.in>
"""

import time
import datetime

from twisted.internet import defer

from zope.interface import implementer

from tensor.interfaces import ITensorSource
from tensor.objects import Source

from tensor.utils import HTTPRequest, fork
from tensor.aggregators import Counter64
from tensor.logs import parsers, follower

@implementer(ITensorSource)
[docs]class Nginx(Source): """Reads Nginx stub_status **Configuration arguments:** :param stats_url: URL to fetch stub_status from :type stats_url: str. **Metrics:** :(service name).active: Active connections at this time :(service name).accepts: Accepted connections :(service name).handled: Handled connections :(service name).requests: Total client requests :(service name).reading: Reading requests :(service name).writing: Writing responses :(service name).waiting: Waiting connections """ def _parse_nginx_stats(self, stats): stats = stats.split('\n') active = stats[0].split(': ')[-1] accepts, handled, requests = stats[2].split() _, reading, _, writing, _, waiting = stats[3].split() metrics = { 'active': (float(active), None), 'accepts': (float(accepts), Counter64), 'requests': (float(requests), Counter64), 'handled': (float(handled), Counter64), 'reading': (float(reading), None), 'writing': (float(writing), None), 'waiting': (float(waiting), None), } return metrics @defer.inlineCallbacks def get(self): url = self.config.get('url', self.config.get('stats_url')) body = yield HTTPRequest().getBody(url, headers={'User-Agent': ['Tensor']}, ) events = [] if body: metrics = self._parse_nginx_stats(body) for k,v in metrics.items(): metric, aggr = v events.append( self.createEvent('ok', 'Nginx %s' % (k), metric, prefix=k, aggregation=aggr) ) defer.returnValue(events)
@implementer(ITensorSource)
[docs]class NginxLogMetrics(Source): """Tails Nginx log files, parses them and returns metrics for data usage and requests against other fields. **Configuration arguments:** :param log_format: Log format passed to parser, same as the config definition :type log_format: str. :param file: Log file :type file: str. :param max_lines: Maximum number of log lines to read per interval to prevent overwhelming Tensor when reading large logs (default 2000) :type max_lines: int. :param resolution: Aggregate bucket resolution in seconds (default 10) :type resolution: int. :param history: Read the entire file from scratch if we've never seen it (default false) :type history: bool. **Metrics:** :(service name).total_rbytes: Bytes total for all requests :(service name).total_requests: Total request count :(service name).stats.(code).(requests|rbytes): Metrics by status code :(service name).user-agent.(agent).(requests|rbytes): Metrics by user agent :(service name).client.(ip).(requests|rbytes): Metrics by client IP :(service name).request.(request path).(requests|rbytes): Metrics by request path """ # Don't allow overlapping runs sync = True def __init__(self, *a): Source.__init__(self, *a) parser = parsers.ApacheLogParser(self.config.get('log_format', 'combined')) history = self.config.get('history', False) self.log = follower.LogFollower(self.config['file'], parser=parser.parse, history=history) self.max_lines = int(self.config.get('max_lines', 2000)) self.bucket_res = int(self.config.get('resolution', 10)) self.bucket = 0 def _aggregate_fields(self, row, b, field, fil=None): f = row.get(field, None) if f: if fil: f = fil(f) if not (field in self.st): self.st[field] = {} if not (f in self.st[field]): self.st[field][f] = [b, 1] else: self.st[field][f][0] += b self.st[field][f][1] += 1 def dumpEvents(self, ts): if self.st: events = [ self.createEvent('ok', 'Nginx rbytes', self.rbytes, prefix='total_rbytes', evtime=ts), self.createEvent('ok', 'Nginx requests', self.requests, prefix='total_requests', evtime=ts) ] for field, block in self.st.items(): for key, vals in block.items(): rbytes, requests = vals events.extend([ self.createEvent('ok', 'Nginx %s %s rbytes' % (field, key), rbytes, prefix='%s.%s.rbytes' % (field, key), evtime=ts), self.createEvent('ok', 'Nginx %s %s requests' % (field, key), requests, prefix='%s.%s.requests' % (field, key), evtime=ts) ]) self.st = {} self.rbytes = 0 self.requests = 0 self.queueBack(events) def got_line(self, line): b = line.get('rbytes', 0) if b: self.rbytes += b self.requests += 1 t = time.mktime(line['time'].timetuple()) # Calculate the time bucket for this line bucket = int(int(t)/self.bucket_res)*self.bucket_res if self.bucket: if (bucket != self.bucket): self.dumpEvents(float(self.bucket)) self.bucket = bucket else: self.bucket = bucket self._aggregate_fields(line, b, 'status') self._aggregate_fields(line, b, 'client') self._aggregate_fields(line, b, 'user-agent', fil=lambda l: l.replace('.',',') ) self._aggregate_fields(line, b, 'request', fil=lambda l: l.split()[1].split('?')[0].replace('.',',') ) def get(self): self.rbytes = 0 self.requests = 0 self.st = {} self.log.get_fn(self.got_line, max_lines=self.max_lines) self.dumpEvents(float(self.bucket))
@implementer(ITensorSource)
[docs]class NginxLog(Source): """Tails Nginx log files, parses them and returns log events for outputs which support them. **Configuration arguments:** :param log_format: Log format passed to parser, same as the config definition (default: combined) :type log_format: str. :param file: Log file :type file: str. :param max_lines: Maximum number of log lines to read per interval to prevent overwhelming Tensor when reading large logs (default 2000) :type max_lines: int. """ # Don't allow overlapping runs sync = True def __init__(self, *a): Source.__init__(self, *a) self.parser = parsers.ApacheLogParser(self.config.get('log_format', 'combined')) self.log = follower.LogFollower(self.config['file'], parser=self._parser_proxy, history=False) self.max_lines = int(self.config.get('max_lines', 2000)) def got_eventlog(self, event): self.queueBack(event) def _parser_proxy(self, line): """Parses log lines and returns a `log` type Event object """ d = self.parser.parse(line) t = time.mktime(d['time'].timetuple()) d['@timestamp'] = datetime.datetime.utcfromtimestamp(t).isoformat() d['time'] = str(d['time']) d['message'] = line d['logname'] = self.config['file'] return self.createLog('nginx', d, t) def get(self): self.log.get_fn(self.got_eventlog, max_lines=self.max_lines)