Source code for tensor.utils

import signal
import json
import time
import urllib
import exceptions
import os

from StringIO import StringIO

from zope.interface import implements

from twisted.internet import reactor, protocol, defer, error
from twisted.web.http_headers import Headers
from twisted.web.iweb import IBodyProducer
from twisted.web.client import Agent
from twisted.names import client
from twisted.python import log

from twisted.internet.endpoints import clientFromString

class SocketyAgent(Agent):
    def __init__(self, reactor, path, **kwargs):
        self.path = path
        Agent.__init__(self, reactor, **kwargs)

    def _getEndpoint(self, scheme, host, port):
        client = clientFromString(reactor, self.path)
        return client

[docs]class Timeout(Exception): """ Raised to notify that an operation exceeded its timeout. """
[docs]class Resolver(object): """Helper class for DNS resolution """ def __init__(self): self.recs = {} self.resolver = client.getResolver() def reverseNameFromIPAddress(self, address): return '.'.join(reversed(address.split('.'))) + '.in-addr.arpa' def reverse(self, ip): def _ret(result, ip): host = ip if isinstance(result, tuple): answers, authority, additional = result if isinstance(answers, list): ttl = answers[0].payload.ttl host = answers[0].payload.name.name self.recs[ip] = (host, ttl, time.time()) return host if ip in self.recs: host, ttl, t = self.recs[ip] if (time.time() - t) < ttl: return defer.maybeDeferred(lambda x: x, host) return self.resolver.lookupPointer( name=self.reverseNameFromIPAddress(address=ip) ).addCallback(_ret, ip).addErrback(_ret, ip)
[docs]class BodyReceiver(protocol.Protocol): """ Simple buffering consumer for body objects """ def __init__(self, finished): self.finished = finished self.buffer = StringIO() def dataReceived(self, buffer): self.buffer.write(buffer) def connectionLost(self, reason): self.buffer.seek(0) self.finished.callback(self.buffer)
[docs]class StringProducer(object): """String producer for writing to HTTP requests """ implements(IBodyProducer) def __init__(self, body): self.body = body self.length = len(body) def startProducing(self, consumer): consumer.write(self.body) return defer.succeed(None) def pauseProducing(self): pass def stopProducing(self): pass
[docs]class ProcessProtocol(protocol.ProcessProtocol): """ProcessProtocol which supports timeouts""" def __init__(self, deferred, timeout): self.timeout = timeout self.timer = None self.deferred = deferred self.outBuf = StringIO() self.errBuf = StringIO() self.outReceived = self.outBuf.write self.errReceived = self.errBuf.write def processEnded(self, reason): if self.timer and (not self.timer.called): self.timer.cancel() out = self.outBuf.getvalue() err = self.errBuf.getvalue() e = reason.value code = e.exitCode if e.signal: self.deferred.errback(reason) else: self.deferred.callback((out, err, code)) def connectionMade(self): @defer.inlineCallbacks def killIfAlive(): try: yield self.transport.signalProcess('KILL') log.msg('Killed source proccess: Timeout %s exceeded' % self.timeout) except error.ProcessExitedAlready: pass self.timer = reactor.callLater(self.timeout, killIfAlive)
[docs]def fork(executable, args=(), env={}, path=None, timeout=3600): """fork Provides a deferred wrapper function with a timeout function :param executable: Executable :type executable: str. :param args: Tupple of arguments :type args: tupple. :param env: Environment dictionary :type env: dict. :param timeout: Kill the child process if timeout is exceeded :type timeout: int. """ d = defer.Deferred() p = ProcessProtocol(d, timeout) reactor.spawnProcess(p, executable, (executable,)+tuple(args), env, path) return d
try: from twisted.internet.ssl import ClientContextFactory class WebClientContextFactory(ClientContextFactory): def getContext(self, hostname, port): return ClientContextFactory.getContext(self) SSL=True except: SSL=False try: from twisted.web import client client._HTTP11ClientFactory.noisy = False client.HTTPClientFactory.noisy = False except: pass class HTTPRequest(object): def __init__(self, timeout=120): self.timeout = timeout def abort_request(self, request): """Called to abort request on timeout""" self.timedout = True if not request.called: try: request.cancel() except error.AlreadyCancelled: return @defer.inlineCallbacks def response(self, request): if request.length: d = defer.Deferred() request.deliverBody(BodyReceiver(d)) b = yield d body = b.read() else: body = "" defer.returnValue(body) def request(self, url, method='GET', headers={}, data=None, socket=None): self.timedout = False if socket: agent = SocketyAgent(reactor, socket) else: if url[:5] == 'https': if SSL: agent = Agent(reactor, WebClientContextFactory()) else: raise Exception('HTTPS requested but not supported') else: agent = Agent(reactor) request = agent.request(method, url, Headers(headers), StringProducer(data) if data else None ) if self.timeout: timer = reactor.callLater(self.timeout, self.abort_request, request) def timeoutProxy(request): if timer.active(): timer.cancel() return self.response(request) def requestAborted(failure): if timer.active(): timer.cancel() failure.trap(defer.CancelledError, error.ConnectingCancelledError) raise Timeout( "Request took longer than %s seconds" % self.timeout) request.addCallback(timeoutProxy).addErrback(requestAborted) else: request.addCallback(self.response) return request def getBody(self, url, method='GET', headers={}, data=None, socket=None): """Make an HTTP request and return the body """ if not 'User-Agent' in headers: headers['User-Agent'] = ['Tensor HTTP checker'] return self.request(url, method, headers, data, socket) @defer.inlineCallbacks def getJson(self, url, method='GET', headers={}, data=None, socket=None): """Fetch a JSON result via HTTP """ if not 'Content-Type' in headers: headers['Content-Type'] = ['application/json'] body = yield self.getBody(url, method, headers, data, socket) defer.returnValue(json.loads(body))
[docs]class PersistentCache(object): """A very basic dictionary cache abstraction. Not to be used for large amounts of data or high concurrency""" def __init__(self, location='/var/lib/tensor/cache'): self.store = {} self.location = location self.mtime = 0 self._read() def _changed(self): if os.path.exists(self.location): mtime = os.stat(self.location).st_mtime return self.mtime != mtime else: return False def _acquire_cache(self): try: cache_file = open(self.location, 'r') except exceptions.IOError: return {} cache = json.loads(cache_file.read()) cache_file.close() return cache def _write_cache(self, d): cache_file = open(self.location, 'w') cache_file.write(json.dumps(d)) cache_file.close() def _persist(self): cache = self._acquire_cache() for k, v in self.store.iteritems(): cache[k] = v self._write_cache(cache) def _read(self): cache = self._acquire_cache() for k, v in cache.iteritems(): self.store[k] = v def _remove_key(self, k): cache = self._acquire_cache() if k in cache: if k in cache: del cache[k] if k in self.store: del self.store[k] self._write_cache(cache)
[docs] def expire(self, age): """Expire any items in the cache older than `age` seconds""" now = time.time() cache = self._acquire_cache() expired = [k for k, v in cache.iteritems() if (now - v[0]) > age] for k in expired: if k in cache: del cache[k] if k in self.store: del self.store[k] self._write_cache(cache)
[docs] def set(self, k, v): """Set a key `k` to value `v`""" self.store[k] = (time.time(), v) self._persist()
[docs] def get(self, k): """Returns key contents, and modify time""" if self._changed(): self._read() if k in self.store: return tuple(self.store[k]) else: return None
[docs] def contains(self, k): """Return True if key `k` exists""" if self._changed(): self._read() return k in self.store.keys()
[docs] def delete(self, k): """Remove key `k` from the cache""" self._remove_key(k)