"""
.. module:: nginx
:platform: Unix
:synopsis: A source module for nginx stats
.. moduleauthor:: Colin Alston <colin@imcol.in>
"""
import time
from twisted.internet import defer, reactor
from twisted.web.client import Agent
from twisted.web.http_headers import Headers
from zope.interface import implements
from tensor.interfaces import ITensorSource
from tensor.objects import Source
from tensor.utils import BodyReceiver, fork
from tensor.aggregators import Counter64
from tensor.logs import parsers, follower
[docs]class Nginx(Source):
"""Reads Nginx stub_status
**Configuration arguments:**
:param stats_url: URL to fetch stub_status from
:type stats_url: str.
**Metrics:**
:(service name).active: Active connections at this time
:(service name).accepts: Accepted connections
:(service name).handled: Handled connections
:(service name).requests: Total client requests
:(service name).reading: Reading requests
:(service name).writing: Writing responses
:(service name).waiting: Waiting connections
"""
implements(ITensorSource)
def _parse_nginx_stats(self, stats):
stats = stats.split('\n')
active = stats[0].split(': ')[-1]
accepts, handled, requests = stats[2].split()
_, reading, _, writing, _, waiting = stats[3].split()
metrics = {
'active': (float(active), None),
'accepts': (float(accepts), Counter64),
'requests': (float(requests), Counter64),
'handled': (float(handled), Counter64),
'reading': (float(reading), None),
'writing': (float(writing), None),
'waiting': (float(waiting), None),
}
return metrics
@defer.inlineCallbacks
def get(self):
agent = Agent(reactor)
url = self.config.get('url', self.config.get('stats_url'))
t0 = time.time()
request = yield agent.request('GET', url,
Headers({'User-Agent': ['Tensor']}),
)
events = []
if request.length:
d = defer.Deferred()
request.deliverBody(BodyReceiver(d))
b = yield d
body = b.read()
metrics = self._parse_nginx_stats(body)
for k,v in metrics.items():
metric, aggr = v
events.append(
self.createEvent('ok', 'Nginx %s' % (k), metric, prefix=k,
aggregation=aggr)
)
defer.returnValue(events)
[docs]class NginxLogMetrics(Source):
"""Tails Nginx log files, parses them and returns metrics for data usage
and requests against other fields.
**Configuration arguments:**
:param log_format: Log format passed to parser, same as the config
definition
:type log_format: str.
:param file: Log file
:type file: str.
:param max_lines: Maximum number of log lines to read per interval to
prevent overwhelming Tensor when reading large logs
(default 2000)
:type max_lines: int.
:param resolution: Aggregate bucket resolution in seconds (default 10)
:type resolution: int.
**Metrics:**
:(service name).total_bytes: Bytes total for all requests
:(service name).total_requests: Total request count
:(service name).stats.(code).(requests|bytes): Metrics by status code
:(service name).user-agent.(agent).(requests|bytes): Metrics by user agent
:(service name).client.(ip).(requests|bytes): Metrics by client IP
:(service name).request.(request path).(requests|bytes): Metrics by request path
"""
implements(ITensorSource)
# Don't allow overlapping runs
sync = True
def __init__(self, *a):
Source.__init__(self, *a)
parser = parsers.ApacheLogParser(self.config.get('log_format', 'combined'))
self.log = follower.LogFollower(self.config['file'], parser=parser.parse)
self.max_lines = int(self.config.get('max_lines', 2000))
self.bucket_res = int(self.config.get('resolution', 10))
self.bucket = 0
def _aggregate_fields(self, row, b, field, fil=None):
f = row.get(field, None)
if f:
if fil:
f = fil(f)
if not (field in self.st):
self.st[field] = {}
if not (f in self.st[field]):
self.st[field][f] = [b, 1]
else:
self.st[field][f][0] += b
self.st[field][f][1] += 1
def dumpEvents(self, ts):
if self.st:
events = [
self.createEvent('ok', 'Nginx bytes', self.bytes, prefix='total_bytes',
evtime=ts),
self.createEvent('ok', 'Nginx requests', self.requests,
prefix='total_requests', evtime=ts)
]
for field, block in self.st.items():
for key, vals in block.items():
bytes, requests = vals
events.extend([
self.createEvent('ok', 'Nginx %s %s bytes' % (field, key), bytes,
prefix='%s.%s.bytes' % (field, key), evtime=ts),
self.createEvent('ok', 'Nginx %s %s requests' % (field, key), requests,
prefix='%s.%s.requests' % (field, key), evtime=ts)
])
self.st = {}
self.bytes = 0
self.requests = 0
self.queueBack(events)
def got_line(self, line):
b = line.get('bytes', 0)
if b:
self.bytes += b
self.requests += 1
t = time.mktime(line['time'].timetuple())
# Calculate the time bucket for this line
bucket = (int(t)/self.bucket_res)*self.bucket_res
if self.bucket:
if (bucket != self.bucket):
self.dumpEvents(float(self.bucket))
self.bucket = bucket
else:
self.bucket = bucket
self._aggregate_fields(line, b, 'status')
self._aggregate_fields(line, b, 'client')
self._aggregate_fields(line, b, 'user-agent',
fil=lambda l: l.replace('.',',')
)
self._aggregate_fields(line, b, 'request',
fil=lambda l: l.split()[1].split('?')[0].replace('.',',')
)
def get(self):
self.bytes = 0
self.requests = 0
self.st = {}
self.log.get_fn(self.got_line, max_lines=self.max_lines)
self.dumpEvents(float(self.bucket))