Source code for tensor.utils

import signal
import json
import time
import urllib
from StringIO import StringIO

from zope.interface import implements

from twisted.internet import reactor, protocol, defer, error
from twisted.web.http_headers import Headers
from twisted.web.iweb import IBodyProducer
from twisted.web.client import Agent
from twisted.names import client
from twisted.python import log


[docs]class Timeout(Exception): """ Raised to notify that an operation exceeded its timeout. """
[docs]class Resolver(object): """Helper class for DNS resolution """ def __init__(self): self.recs = {} self.resolver = client.getResolver() def reverseNameFromIPAddress(self, address): return '.'.join(reversed(address.split('.'))) + '.in-addr.arpa' def reverse(self, ip): def _ret(result, ip): host = ip if isinstance(result, tuple): answers, authority, additional = result if isinstance(answers, list): ttl = answers[0].payload.ttl host = answers[0].payload.name.name self.recs[ip] = (host, ttl, time.time()) return host if ip in self.recs: host, ttl, t = self.recs[ip] if (time.time() - t) < ttl: return defer.maybeDeferred(lambda x: x, host) return self.resolver.lookupPointer( name=self.reverseNameFromIPAddress(address=ip) ).addCallback(_ret, ip).addErrback(_ret, ip)
[docs]class BodyReceiver(protocol.Protocol): """ Simple buffering consumer for body objects """ def __init__(self, finished): self.finished = finished self.buffer = StringIO() def dataReceived(self, buffer): self.buffer.write(buffer) def connectionLost(self, reason): self.buffer.seek(0) self.finished.callback(self.buffer)
[docs]class StringProducer(object): """String producer for writing to HTTP requests """ implements(IBodyProducer) def __init__(self, body): self.body = body self.length = len(body) def startProducing(self, consumer): consumer.write(self.body) return defer.succeed(None) def pauseProducing(self): pass def stopProducing(self): pass
[docs]class ProcessProtocol(protocol.ProcessProtocol): """ProcessProtocol which supports timeouts""" def __init__(self, deferred, timeout): self.timeout = timeout self.timer = None self.deferred = deferred self.outBuf = StringIO() self.errBuf = StringIO() self.outReceived = self.outBuf.write self.errReceived = self.errBuf.write def processEnded(self, reason): if self.timer and (not self.timer.called): self.timer.cancel() out = self.outBuf.getvalue() err = self.errBuf.getvalue() e = reason.value code = e.exitCode if e.signal: self.deferred.errback(reason) else: self.deferred.callback((out, err, code)) def connectionMade(self): @defer.inlineCallbacks def killIfAlive(): try: yield self.transport.signalProcess('KILL') log.msg('Killed source proccess: Timeout %s exceeded' % self.timeout) except error.ProcessExitedAlready: pass self.timer = reactor.callLater(self.timeout, killIfAlive)
[docs]def fork(executable, args=(), env={}, path=None, timeout=3600): """fork Provides a deferred wrapper function with a timeout function :param executable: Executable :type executable: str. :param args: Tupple of arguments :type args: tupple. :param env: Environment dictionary :type env: dict. :param timeout: Kill the child process if timeout is exceeded :type timeout: int. """ d = defer.Deferred() p = ProcessProtocol(d, timeout) reactor.spawnProcess(p, executable, (executable,)+tuple(args), env, path) return d
try: from twisted.internet.ssl import ClientContextFactory class WebClientContextFactory(ClientContextFactory): def getContext(self, hostname, port): return ClientContextFactory.getContext(self) SSL=True except: SSL=False try: from twisted.web import client client._HTTP11ClientFactory.noisy = False client.HTTPClientFactory.noisy = False except: pass class HTTPRequest(object): def __init__(self, timeout=120): self.timeout = timeout def abort_request(self, request): """Called to abort request on timeout""" self.timedout = True if not request.called: try: request.cancel() except error.AlreadyCancelled: return @defer.inlineCallbacks def response(self, request): if request.length: d = defer.Deferred() request.deliverBody(BodyReceiver(d)) b = yield d body = b.read() else: body = "" defer.returnValue(body) def request(self, url, method='GET', headers={}, data=None): self.timedout = False if url[:5] == 'https': if SSL: agent = Agent(reactor, WebClientContextFactory()) else: raise Exception('HTTPS requested but not supported') else: agent = Agent(reactor) request = agent.request(method, url, Headers(headers), StringProducer(data) if data else None ) if self.timeout: timer = reactor.callLater(self.timeout, self.abort_request, request) def timeoutProxy(request): if timer.active(): timer.cancel() return self.response(request) def requestAborted(failure): if timer.active(): timer.cancel() failure.trap(defer.CancelledError, error.ConnectingCancelledError) raise Timeout( "Request took longer than %s seconds" % self.timeout) request.addCallback(timeoutProxy).addErrback(requestAborted) else: request.addCallback(self.response) return request def getBody(self, url, method='GET', headers={}, data=None): """Make an HTTP request and return the body """ if not 'User-Agent' in headers: headers['User-Agent'] = ['Tensor HTTP checker'] return self.request(url, method, headers, data) @defer.inlineCallbacks def getJson(self, url, method='GET', headers={}, data=None): """Fetch a JSON result via HTTP """ if not 'Content-Type' in headers: headers['Content-Type'] = ['application/json'] body = yield self.getBody(url, method, headers, data) defer.returnValue(json.loads(body))