Source code for tensor.utils

import signal
from StringIO import StringIO

from twisted.internet import reactor, protocol, defer, error
from twisted.python import log

[docs]class BodyReceiver(protocol.Protocol): """ Simple buffering consumer for body objects """ def __init__(self, finished): self.finished = finished self.buffer = StringIO() def dataReceived(self, buffer): self.buffer.write(buffer) def connectionLost(self, reason): self.finished.callback(self.buffer)
[docs]class ProcessProtocol(protocol.ProcessProtocol): """ProcessProtocol which supports timeouts""" def __init__(self, deferred, timeout): self.timeout = timeout self.timer = None self.deferred = deferred self.outBuf = StringIO() self.errBuf = StringIO() self.outReceived = self.outBuf.write self.errReceived = self.errBuf.write def processEnded(self, reason): if self.timer and (not self.timer.called): self.timer.cancel() out = self.outBuf.getvalue() err = self.errBuf.getvalue() e = reason.value code = e.exitCode if e.signal: self.deferred.errback(reason) else: self.deferred.callback((out, err, code)) def connectionMade(self): @defer.inlineCallbacks def killIfAlive(): log.msg('Killing source proccess: Timeout %s exceeded' % self.timeout) yield self.transport.signalProcess('KILL') self.timer = reactor.callLater(self.timeout, killIfAlive)
[docs]def fork(executable, args=(), env={}, path=None, timeout=3600): """fork Provides a deferred wrapper function with a timeout function **Arguments:** :executable: Executable :type executable: str. **Keyword arguments:** :args: Tupple of arguments :type args: tupple. :env: Environment dictionary :type env: dict. :timeout: Kill the child process if timeout is exceeded :type timeout: int. """ d = defer.Deferred() p = ProcessProtocol(d, timeout) reactor.spawnProcess(p, executable, (executable,)+tuple(args), env, path) return d