import time
from zope.interface import implementer
from twisted.internet import defer
from tensor.interfaces import ITensorSource
from tensor.objects import Source
from tensor.utils import fork
from tensor.aggregators import Counter64
@implementer(ITensorSource)
[docs]class LoadAverage(Source):
"""Reports system load average for the current host
**Metrics:**
:(service name): Load average
"""
def _parse_loadaverage(self, data):
la1 = data.split()[0]
return self.createEvent('ok', 'Load average', float(la1))
def get(self):
return self._parse_loadaverage(open('/proc/loadavg', 'rt').read())
@defer.inlineCallbacks
def sshGet(self):
loadavg, err, code = yield self.fork('/bin/cat /proc/loadavg')
if code == 0:
defer.returnValue(self._parse_loadaverage(loadavg))
else:
raise Exception(err)
@implementer(ITensorSource)
[docs]class DiskIO(Source):
"""Reports disk IO statistics per device
**Configuration arguments:**
:param devices: List of devices to check (optional)
:type devices: list.
**Metrics:**
:(service name).(device name).reads: Number of completed reads
:(service name).(device name).read_bytes: Bytes read per second
:(service name).(device name).read_latency: Disk read latency
:(service name).(device name).writes: Number of completed writes
:(service name).(device name).write_bytes: Bytes written per second
:(service name).(device name).write_latency: Disk write latency
"""
def __init__(self, *a, **kw):
Source.__init__(self, *a, **kw)
self.devices = self.config.get('devices')
self.tcache = {}
self.trc = {}
self.twc = {}
def _parse_stats(self, stats):
disks = {}
events = []
for s in stats:
parts = s.strip().split()
n = parts[2]
# Filter things we don't care about
if (n[:4] != 'loop') and (n[:3] != 'ram'):
dname = "/dev/" + n
if self.devices and (dname not in self.devices):
continue
nums = [int(i) for i in parts[3:]]
reads, read_m, read_sec, read_t = nums[:4]
writes, write_m, write_sec, write_t = nums[4:8]
cur_io, io_t, io_wt = nums[8:]
# Calculate the average latency of read/write ops
if n in self.tcache:
(last_r, last_w, last_rt, last_wt) = self.tcache[n]
r_delta = float(reads - last_r)
w_delta = float(writes - last_w)
if r_delta > 0:
read_lat = (read_t - last_rt)/float(reads - last_r)
self.trc[n] = read_lat
else:
read_lat = self.trc.get(n, None)
if w_delta > 0:
write_lat = (write_t - last_wt)/float(writes - last_w)
self.twc[n] = write_lat
else:
write_lat = self.twc.get(n, None)
else:
if reads > 0:
read_lat = read_t / float(reads)
self.trc[n] = read_lat
else:
read_lat = None
if writes > 0:
write_lat = write_t / float(writes)
self.twc[n] = write_lat
else:
write_lat = None
self.tcache[n] = (reads, writes, read_t, write_t)
if read_lat:
events.append(self.createEvent('ok',
'Read latency (ms)', read_lat,
prefix='%s.read_latency' % dname))
if write_lat:
events.append(self.createEvent('ok',
'Write latency (ms)', write_lat,
prefix='%s.write_latency' % dname))
events.extend([
self.createEvent('ok', 'Reads' , reads,
prefix='%s.reads' % dname, aggregation=Counter64),
self.createEvent('ok', 'Read Bps' , read_sec * 512,
prefix='%s.read_bytes' % dname, aggregation=Counter64),
self.createEvent('ok', 'Writes', writes,
prefix='%s.writes' % dname, aggregation=Counter64),
self.createEvent('ok', 'Write Bps', write_sec * 512,
prefix='%s.write_bytes' % dname, aggregation=Counter64),
])
return events
@defer.inlineCallbacks
def sshGet(self):
diskstats, err, code = yield self.fork('/bin/cat /proc/diskstats')
if code == 0:
stats = diskstats.strip('\n').split('\n')
defer.returnValue(
self._parse_stats(stats))
else:
raise Exception(err)
def _getstats(self):
stats = open('/proc/diskstats', 'rt').read()
return stats.strip('\n').split('\n')
def get(self):
stats = self._getstats()
return self._parse_stats(stats)
@implementer(ITensorSource)
[docs]class CPU(Source):
"""Reports system CPU utilisation as a percentage/100
**Metrics:**
:(service name): Percentage CPU utilisation
:(service name).(type): Percentage CPU utilisation by type
"""
cols = ['user', 'nice', 'system', 'idle', 'iowait', 'irq',
'softirq', 'steal', 'guest', 'guest_nice']
def __init__(self, *a):
Source.__init__(self, *a)
self.cpu = None
def _read_proc_stat(self):
with open('/proc/stat', 'rt') as procstat:
return procstat.readline().strip('\n')
def _calculate_metrics(self, stat):
cpu = [int(i) for i in stat.split()[1:]]
# We might not have all the virt-related numbers, so zero-pad.
cpu = (cpu + [0, 0, 0])[:10]
(user, nice, system, idle, iowait, irq,
softirq, steal, guest, guest_nice) = cpu
usage = user + nice + system + irq + softirq + steal
total = usage + iowait + idle
if not self.cpu:
# No initial values, so set them and return no events.
self.cpu = cpu
self.prev_total = total
self.prev_usage = usage
return None
total_diff = total - self.prev_total
if total_diff != 0:
metrics = [(None, (usage - self.prev_usage) / float(total_diff))]
for i, name in enumerate(self.cols):
prev = self.cpu[i]
cpu_m = (cpu[i] - prev) / float(total_diff)
metrics.append((name, cpu_m))
self.cpu = cpu
self.prev_total = total
self.prev_usage = usage
return metrics
return None
def _transpose_metrics(self, metrics):
if metrics:
events = [
self.createEvent('ok', 'CPU %s %s%%' % (name, int(cpu_m * 100)), cpu_m, prefix=name)
for name, cpu_m in metrics[1:]
]
events.append(self.createEvent(
'ok', 'CPU %s%%' % int(metrics[0][1] * 100), metrics[0][1]))
return events
return None
@defer.inlineCallbacks
def sshGet(self):
procstat, err, code = yield self.fork('/usr/bin/head -n 1 /proc/stat')
if code == 0:
stats = self._calculate_metrics(procstat.strip('\n'))
defer.returnValue(self._transpose_metrics(stats))
else:
raise Exception(err)
def get(self):
stat = self._read_proc_stat()
stats = self._calculate_metrics(stat)
return self._transpose_metrics(stats)
@implementer(ITensorSource)
[docs]class Memory(Source):
"""Reports system memory utilisation as a percentage/100
**Metrics:**
:(service name): Percentage memory utilisation
"""
def _parse_stats(self, mem):
dat = {}
for l in mem:
k, v = l.replace(':', '').split()[:2]
dat[k] = int(v)
free = dat['MemFree'] + dat['Buffers'] + dat['Cached']
total = dat['MemTotal']
used = total - free
return self.createEvent('ok', 'Memory %s/%s' % (used, total),
used/float(total))
def get(self):
mem = open('/proc/meminfo')
return self._parse_stats(mem)
@defer.inlineCallbacks
def sshGet(self):
mem, err, code = yield self.fork('/bin/cat /proc/meminfo')
if code == 0:
defer.returnValue(self._parse_stats(mem.strip('\n').split('\n')))
else:
raise Exception(err)
@implementer(ITensorSource)
[docs]class DiskFree(Source):
"""Returns the free space for all mounted filesystems
**Configuration arguments:**
:param disks: List of devices to check (optional)
:type disks: list.
**Metrics:**
:(service name).(device).used: Used space (%)
:(service name).(device).bytes: Used space (kbytes)
:(service name).(device).free: Free space (kbytes)
"""
ssh = True
@defer.inlineCallbacks
def get(self):
disks = self.config.get('disks')
out, err, code = yield self.fork('/bin/df', args=('-lPx', 'tmpfs',))
out = [i.split() for i in out.strip('\n').split('\n')[1:]]
events = []
for disk, size, used, free, util, mount in out:
if disks and (disk not in disks):
continue
if disk != "udev":
util = int(util.strip('%'))
used = int(used)
free = int(free)
events.extend([
self.createEvent('ok', 'Disk usage %s%%' % (util),
util, prefix="%s.used" % disk),
self.createEvent('ok', 'Disk usage %s kB' % (used),
used, prefix="%s.bytes" % disk),
self.createEvent('ok', 'Disk free %s kB' % (free),
free, prefix="%s.free" % disk)
])
defer.returnValue(events)
@implementer(ITensorSource)
[docs]class Network(Source):
"""Returns all network interface statistics
**Configuration arguments:**
:param interfaces: List of interfaces to check (optional)
:type interfaces: list.
**Metrics:**
:(service name).(device).tx_bytes: Bytes transmitted
:(service name).(device).tx_packets: Packets transmitted
:(service name).(device).tx_errors: Errors
:(service name).(device).rx_bytes: Bytes received
:(service name).(device).rx_packets: Packets received
:(service name).(device).rx_errors: Errors
"""
def _parse_stats(self, stats):
ifaces = self.config.get('interfaces')
ev = []
for stat in stats:
items = stat.split()
iface = items[0].strip(':')
if ifaces and (iface not in ifaces):
continue
tx_bytes = int(items[1])
tx_packets = int(items[2])
tx_err = int(items[3])
rx_bytes = int(items[9])
rx_packets = int(items[10])
rx_err = int(items[11])
ev.extend([
self.createEvent('ok',
'Network %s TX bytes/sec' % (iface),
tx_bytes, prefix='%s.tx_bytes' % iface,
aggregation=Counter64),
self.createEvent('ok',
'Network %s TX packets/sec' % (iface),
tx_packets, prefix='%s.tx_packets' % iface,
aggregation=Counter64),
self.createEvent('ok',
'Network %s TX errors/sec' % (iface),
tx_err, prefix='%s.tx_errors' % iface,
aggregation=Counter64),
self.createEvent('ok',
'Network %s RX bytes/sec' % (iface),
rx_bytes, prefix='%s.rx_bytes' % iface,
aggregation=Counter64),
self.createEvent('ok',
'Network %s RX packets/sec' % (iface),
rx_packets, prefix='%s.rx_packets' % iface,
aggregation=Counter64),
self.createEvent('ok',
'Network %s RX errors/sec' % (iface),
rx_err, prefix='%s.rx_errors' % iface,
aggregation=Counter64),
])
return ev
def _readStats(self):
proc_dev = open('/proc/net/dev', 'rt').read()
return proc_dev.strip('\n').split('\n')[2:]
@defer.inlineCallbacks
def sshGet(self):
net, err, code = yield self.fork('/bin/cat /proc/net/dev')
if code == 0:
defer.returnValue(self._parse_stats(net.strip('\n').split('\n')[2:]))
else:
raise Exception(err)
def get(self):
return self._parse_stats(self._readStats())