Source code for tensor.sources.rabbitmq

import time

from zope.interface import implementer

from twisted.internet import defer
from twisted.python import log

from tensor.interfaces import ITensorSource
from tensor.objects import Source


@implementer(ITensorSource)
[docs]class Queues(Source): """Returns Queue information for a particular vhost **Configuration arguments:** :param vhost: Vhost name :type vhost: str. **Metrics:** :(service_name).(queue).ready: Ready messages for queue :(service_name).(queue).unack: Unacknowledged messages for queue :(service_name).(queue).ready_rate: Ready rate of change per second :(service_name).(queue).unack_rate: Unacknowledge rate of change per second """ ssh = True def __init__(self, *a, **kw): Source.__init__(self, *a, **kw) self.last_t = None self.ready = {} self.unack = {} self.last_ready = 0 self.last_unack = 0 @defer.inlineCallbacks def get(self): vhost = self.config.get('vhost', '/') mqctl = self.config.get('rabbitmqctl', '/usr/sbin/rabbitmqctl') out, err, code = yield self.fork(mqctl, args=( 'list_queues', '-p', vhost, 'name', 'messages_ready', 'messages_unacknowledged' )) if code == 0: t = time.time() total_ready = 0 total_unack = 0 rows = out.strip('\n').split('\n') events = [] for row in rows: if ("..." in row): continue name, ready, unack = row.split() ready = int(ready) unack = int(unack) total_ready += ready total_unack += unack events.extend([ self.createEvent('ok', '%s unacknowledged messages: %s' % ( name, unack), unack, prefix='%s.unack' % name), self.createEvent('ok', '%s ready messages: %s' % ( name, ready), ready, prefix='%s.ready' % name) ]) if name in self.ready: last_ready = self.ready[name] last_unack = self.unack[name] rrate = (ready - last_ready)/float(t - self.last_t) urate = (unack - last_unack)/float(t - self.last_t) events.extend([ self.createEvent('ok', '%s unacknowledged rate: %0.2f' % ( name, urate), urate, prefix='%s.unack_rate' % name), self.createEvent('ok', '%s ready rate: %0.2f' % ( name, rrate), rrate, prefix='%s.ready_rate' % name) ]) self.ready[name] = ready self.unack[name] = unack if self.last_t: # Get total rates rrate = (total_ready - self.last_ready)/float(t - self.last_t) urate = (total_unack - self.last_unack)/float(t - self.last_t) events.extend([ self.createEvent('ok', 'Total unacknowledged rate: %0.2f' % urate, urate, prefix='total.unack_rate'), self.createEvent('ok', 'Total ready rate: %0.2f' % rrate, rrate, prefix='total.ready_rate'), self.createEvent('ok', 'Total unacknowledged messages: %s' % total_unack, total_unack, prefix='total.unack'), self.createEvent('ok', 'Total ready messages: %s' % total_ready, total_ready, prefix='total.ready') ]) self.last_ready = total_ready self.last_unack = total_unack self.last_t = t defer.returnValue(events) else: log.msg('Error running rabbitmqctl: ' + repr(err))