Source code for tensor.utils

import signal
import json
import time
import urllib
import os

try:
    from StringIO import StringIO
except ImportError:
    from io import StringIO

try:
    from exceptions import IOError
except ImportError:
    pass

from zope.interface import implementer

from twisted.internet import reactor, protocol, defer, error
from twisted.web.http_headers import Headers
from twisted.web.iweb import IBodyProducer
from twisted.web.client import Agent
from twisted.names import client
from twisted.python import log

from twisted.internet.endpoints import clientFromString

class SocketyAgent(Agent):
    def __init__(self, reactor, path, **kwargs):
        self.path = path
        Agent.__init__(self, reactor, **kwargs)

    def _getEndpoint(self, scheme, host, port):
        client = clientFromString(reactor, self.path)
        return client

[docs]class Timeout(Exception): """ Raised to notify that an operation exceeded its timeout. """
[docs]class Resolver(object): """Helper class for DNS resolution """ def __init__(self): self.recs = {} self.resolver = client.getResolver() def reverseNameFromIPAddress(self, address): return '.'.join(reversed(address.split('.'))) + '.in-addr.arpa' def reverse(self, ip): def _ret(result, ip): host = ip if isinstance(result, tuple): answers, authority, additional = result if isinstance(answers, list): ttl = answers[0].payload.ttl host = answers[0].payload.name.name self.recs[ip] = (host, ttl, time.time()) return host if ip in self.recs: host, ttl, t = self.recs[ip] if (time.time() - t) < ttl: return defer.maybeDeferred(lambda x: x, host) return self.resolver.lookupPointer( name=self.reverseNameFromIPAddress(address=ip) ).addCallback(_ret, ip).addErrback(_ret, ip)
[docs]class BodyReceiver(protocol.Protocol): """ Simple buffering consumer for body objects """ def __init__(self, finished): self.finished = finished self.data = StringIO() def dataReceived(self, data): self.data.write(data.decode()) def connectionLost(self, reason): self.data.seek(0) self.finished.callback(self.data)
@implementer(IBodyProducer)
[docs]class StringProducer(object): """String producer for writing to HTTP requests """ def __init__(self, body): self.body = body self.length = len(body) def startProducing(self, consumer): consumer.write(self.body) return defer.succeed(None) def pauseProducing(self): pass def stopProducing(self): pass
[docs]class ProcessProtocol(protocol.ProcessProtocol): """ProcessProtocol which supports timeouts""" def __init__(self, deferred, timeout): self.timeout = timeout self.timer = None self.deferred = deferred self.outBuf = StringIO() self.errBuf = StringIO() def outReceived(self, data): self.outBuf.write(data.decode()) def errReceived(self, data): self.errBuf.write(data.decode()) def processEnded(self, reason): if self.timer and (not self.timer.called): self.timer.cancel() out = self.outBuf.getvalue() err = self.errBuf.getvalue() e = reason.value code = e.exitCode if e.signal: self.deferred.errback(reason) else: self.deferred.callback((out, err, code)) def connectionMade(self): @defer.inlineCallbacks def killIfAlive(): try: yield self.transport.signalProcess('KILL') log.msg('Killed source proccess: Timeout %s exceeded' % self.timeout) except error.ProcessExitedAlready: pass self.timer = reactor.callLater(self.timeout, killIfAlive)
[docs]def fork(executable, args=(), env={}, path=None, timeout=3600): """fork Provides a deferred wrapper function with a timeout function :param executable: Executable :type executable: str. :param args: Tupple of arguments :type args: tupple. :param env: Environment dictionary :type env: dict. :param timeout: Kill the child process if timeout is exceeded :type timeout: int. """ d = defer.Deferred() p = ProcessProtocol(d, timeout) reactor.spawnProcess(p, executable, (executable,)+tuple(args), env, path) return d
try: from twisted.internet.ssl import ClientContextFactory class WebClientContextFactory(ClientContextFactory): def getContext(self, hostname, port): return ClientContextFactory.getContext(self) SSL=True except: SSL=False try: from twisted.web import client client._HTTP11ClientFactory.noisy = False client.HTTPClientFactory.noisy = False except: pass class HTTPRequest(object): def __init__(self, timeout=120): self.timeout = timeout def abort_request(self, request): """Called to abort request on timeout""" self.timedout = True if not request.called: try: request.cancel() except error.AlreadyCancelled: return @defer.inlineCallbacks def response(self, request): if request.length: d = defer.Deferred() request.deliverBody(BodyReceiver(d)) b = yield d body = b.read() else: body = "" defer.returnValue(body) def request(self, url, method='GET', headers={}, data=None, socket=None): self.timedout = False if socket: agent = SocketyAgent(reactor, socket) else: if url[:5] == 'https': if SSL: agent = Agent(reactor, WebClientContextFactory()) else: raise Exception('HTTPS requested but not supported') else: agent = Agent(reactor) request = agent.request(method.encode(), url.encode(), Headers(headers), StringProducer(data) if data else None ) if self.timeout: timer = reactor.callLater(self.timeout, self.abort_request, request) def timeoutProxy(request): if timer.active(): timer.cancel() return self.response(request) def requestAborted(failure): if timer.active(): timer.cancel() failure.trap(defer.CancelledError, error.ConnectingCancelledError) raise Timeout( "Request took longer than %s seconds" % self.timeout) request.addCallback(timeoutProxy).addErrback(requestAborted) else: request.addCallback(self.response) return request def getBody(self, url, method='GET', headers={}, data=None, socket=None): """Make an HTTP request and return the body """ if not 'User-Agent' in headers: headers['User-Agent'] = ['Tensor HTTP checker'] return self.request(url, method, headers, data, socket) @defer.inlineCallbacks def getJson(self, url, method='GET', headers={}, data=None, socket=None): """Fetch a JSON result via HTTP """ if not 'Content-Type' in headers: headers['Content-Type'] = ['application/json'] body = yield self.getBody(url, method, headers, data, socket) defer.returnValue(json.loads(body))
[docs]class PersistentCache(object): """A very basic dictionary cache abstraction. Not to be used for large amounts of data or high concurrency""" def __init__(self, location='/var/lib/tensor/cache'): self.store = {} self.location = location self.mtime = 0 self._read() def _changed(self): if os.path.exists(self.location): mtime = os.stat(self.location).st_mtime return self.mtime != mtime else: return False def _acquire_cache(self): try: cache_file = open(self.location, 'r') except IOError: return {} cache = json.loads(cache_file.read()) cache_file.close() return cache def _write_cache(self, d): cache_file = open(self.location, 'w') cache_file.write(json.dumps(d)) cache_file.close() def _persist(self): cache = self._acquire_cache() for k, v in self.store.items(): cache[k] = v self._write_cache(cache) def _read(self): cache = self._acquire_cache() for k, v in cache.items(): self.store[k] = v def _remove_key(self, k): cache = self._acquire_cache() if k in cache: if k in cache: del cache[k] if k in self.store: del self.store[k] self._write_cache(cache)
[docs] def expire(self, age): """Expire any items in the cache older than `age` seconds""" now = time.time() cache = self._acquire_cache() expired = [k for k, v in cache.items() if (now - v[0]) > age] for k in expired: if k in cache: del cache[k] if k in self.store: del self.store[k] self._write_cache(cache)
[docs] def set(self, k, v): """Set a key `k` to value `v`""" self.store[k] = (time.time(), v) self._persist()
[docs] def get(self, k): """Returns key contents, and modify time""" if self._changed(): self._read() if k in self.store: return tuple(self.store[k]) else: return None
[docs] def contains(self, k): """Return True if key `k` exists""" if self._changed(): self._read() return k in self.store.keys()
[docs] def delete(self, k): """Remove key `k` from the cache""" self._remove_key(k)