00001
00002
00003
00004
00005
00006
00007
00008 '''Asynchronous command execution. Integrated into the GLib main loop.'''
00009
00010 import sys
00011 import os
00012 import fcntl
00013 import signal
00014 import gobject
00015
00016 import sets
00017 set = sets.Set
00018
00019
00020
00021
00022 def spawn(args, vars=None, workdir=None, channels=[]):
00023 assert type(args) == list and len(args) > 0
00024 assert not vars or type(vars) == dict
00025
00026 pipes = [os.pipe() for entry in channels]
00027
00028 pid = os.fork()
00029 if pid == 0:
00030 for fd in xrange(1024):
00031 try:
00032 fcntl.fcntl(fd, fcntl.F_SETFD, fcntl.FD_CLOEXEC)
00033 except:
00034 pass
00035
00036 try:
00037 if vars:
00038 os.environ.update(vars)
00039
00040 if workdir:
00041 os.chdir(workdir)
00042
00043 for fileno, channel in channels:
00044 fd = pipes.pop(0)
00045
00046 if channel.output:
00047 master, slave = fd
00048 else:
00049 slave, master = fd
00050
00051 os.close(master)
00052 if slave != fileno:
00053 os.dup2(slave, fileno)
00054 os.close(slave)
00055
00056 os.execvp(args[0], args)
00057
00058 except Exception, e:
00059 try:
00060 print >>sys.stderr, '%s:' % args[0],
00061 except:
00062 pass
00063
00064 try:
00065 print >>sys.stderr, e
00066 except:
00067 pass
00068
00069 os._exit(127)
00070
00071 for fileno, channel in channels:
00072 fd = pipes.pop(0)
00073
00074 if channel.output:
00075 master, slave = fd
00076 else:
00077 slave, master = fd
00078
00079 os.close(slave)
00080 channel.open(master)
00081
00082 return pid
00083
00084
00085
00086
00087 class Channel (gobject.GObject):
00088 def __init__(self, condition=0, callback=None):
00089 gobject.GObject.__init__(self)
00090
00091 self.file = None
00092 self.condition = condition
00093 self.callback = callback
00094
00095 def open(self, file):
00096 assert not self.file
00097
00098 self.file = file
00099
00100 mask = self.condition | gobject.IO_ERR | gobject.IO_HUP
00101 self.id = gobject.io_add_watch(self.file, mask, self._watch)
00102
00103 self.emit('opened')
00104
00105 def _watch(self, file, condition):
00106 if condition & gobject.IO_ERR:
00107 close = True
00108 elif condition & gobject.IO_HUP:
00109 if condition & self.condition:
00110 self.callback(file, flush=True)
00111
00112 close = True
00113 else:
00114 if condition & self.condition:
00115 self.callback(file, flush=False)
00116
00117 close = False
00118
00119 if close:
00120 self.file = None
00121 self.id = None
00122
00123 try:
00124 file.close()
00125 except IOError:
00126 pass
00127
00128 self.emit('closed')
00129
00130 return False
00131 else:
00132 return True
00133
00134 def close(self):
00135 if self.id:
00136 try:
00137 gobject.remove_source(self.id)
00138 except:
00139 pass
00140
00141 try:
00142 self.file.close()
00143 except:
00144 pass
00145
00146 self.id = None
00147 self.file = None
00148
00149 self.emit('closed')
00150
00151 gobject.type_register(Channel)
00152 gobject.signal_new('opened', Channel, 0, gobject.TYPE_NONE, [])
00153 gobject.signal_new('closed', Channel, 0, gobject.TYPE_NONE, [])
00154
00155 class WriteChannel (Channel):
00156 output = False
00157
00158 def __init__(self):
00159 Channel.__init__(self)
00160
00161 def open(self, fd):
00162 file = os.fdopen(fd, 'w')
00163 Channel.open(self, file)
00164
00165 def write(self, data):
00166 assert self.file
00167 self.file.write(data)
00168
00169 def flush(self):
00170 assert self.file
00171 self.file.flush()
00172
00173 gobject.type_register(WriteChannel)
00174
00175 class ReadChannel (Channel):
00176 output = True
00177
00178 def __init__(self, consumers=[]):
00179 Channel.__init__(self, gobject.IO_IN, self._read)
00180 self.consumers = set(consumers)
00181
00182 def open(self, fd):
00183 fcntl.fcntl(fd, fcntl.F_SETFL, os.O_NONBLOCK)
00184 file = os.fdopen(fd, 'r')
00185 Channel.open(self, file)
00186
00187 def _read(self, file, flush):
00188 while True:
00189 data = file.read()
00190 if data:
00191 for callback in self.consumers:
00192 callback(data)
00193
00194 if not flush or not data:
00195 break
00196
00197 def write(self, data):
00198 for callback in self.consumers:
00199 callback(data)
00200
00201
00202 gobject.type_register(ReadChannel)
00203
00204
00205
00206
00207 class Executor (gobject.GObject):
00208 def __init__(self):
00209 gobject.GObject.__init__(self)
00210
00211 self.channels = []
00212 self.open_channels = set()
00213
00214 self.pid = None
00215 self.watch = None
00216 self.status = None
00217
00218 def redirect_input(self, fileno):
00219 return self.redirect(fileno, WriteChannel())
00220
00221 def redirect_output(self, fileno):
00222 return self.redirect(fileno, ReadChannel())
00223
00224 def redirect(self, fileno, channel):
00225 channel.connect('opened', self.on_channel_opened)
00226 channel.connect('closed', self.on_channel_closed)
00227
00228 self.channels.append((fileno, channel))
00229 return channel
00230
00231 def start(self, job):
00232 assert not self.is_alive()
00233
00234 self.status = None
00235 self.pid = job(self.channels)
00236 self.watch = gobject.child_watch_add(self.pid, self.on_child_exited)
00237
00238 def stop(self):
00239 try:
00240 self.kill(signal.SIGTERM)
00241 except:
00242 pass
00243
00244 def kill(self, signal):
00245 if self.pid is not None:
00246 os.kill(self.pid, signal)
00247
00248 def is_alive(self):
00249 return (self.pid is not None) or self.open_channels
00250
00251 def on_channel_opened(self, channel):
00252 self.open_channels.add(channel)
00253
00254 def on_channel_closed(self, channel):
00255 self.open_channels.remove(channel)
00256
00257 self._maybe_emit_stopped()
00258
00259 def on_child_exited(self, pid, status):
00260 self.pid = None
00261 self.watch = None
00262 self.status = status
00263
00264 self._maybe_emit_stopped()
00265
00266 def _maybe_emit_stopped(self):
00267 if (self.pid is None) and (not self.open_channels):
00268 self.emit('stopped', self.status)
00269
00270 gobject.type_register(Executor)
00271 gobject.signal_new('stopped', Executor, 0, gobject.TYPE_NONE, [gobject.TYPE_INT])
00272
00273
00274
00275
00276 class Job (gobject.GObject):
00277 '''Describes command parameters. The 'args', 'vars' and 'workdir' attributes are publicly
00278 accessible.'''
00279
00280 def __init__(self, args, vars=None, workdir=None):
00281 '''ARGS is the argument vector starting with the program name or path.
00282 VARS is a dictionary of additional environment variables.
00283 WORKDIR is the current working directory of the command.'''
00284
00285 gobject.GObject.__init__(self)
00286
00287 self.args = args
00288 self.vars = vars
00289 self.workdir = workdir
00290
00291 def __iter__(self):
00292 return iter((self.args, self.vars, self.workdir))
00293
00294 def __repr__(self):
00295 return '%s(%s, %s, %s)' % (Job.__name__, repr(self.args), repr(self.vars), repr(self.workdir))
00296
00297 def __call__(self, channels=[]):
00298 return spawn(self.args, self.vars, self.workdir, channels)
00299
00300 gobject.type_register(Job)
00301
00302
00303
00304
00305 class Hook (gobject.GObject):
00306 def __init__(self, func):
00307 gobject.GObject.__init__(self)
00308 self.func = func
00309
00310 def __repr__(self):
00311 return '%s(%s)' % (Job.__name__, repr(self.func))
00312
00313 def __str__(self):
00314 return self.func.func_name
00315
00316 def __call__(self):
00317 return self.func()
00318
00319 gobject.type_register(Hook)
00320
00321
00322
00323
00324 class Batch (gobject.GObject):
00325 '''Sequence of jobs that can be monitored via signals. The "job-started" signal is emitted
00326 when a new job starts executing (callbacks are invoked with the Job as a parameter).
00327 "finished" is emitted when a job fails or the whole batch finishes (callbacks are
00328 invoked with the exit status as a parameter).'''
00329
00330 def __init__(self, jobs=None):
00331 '''Initialise an empty batch.'''
00332
00333 gobject.GObject.__init__(self)
00334 self.connect('job-started', self.on_started)
00335
00336 self.executor = Executor()
00337 self.executor.connect('stopped', self.on_executor_stopped)
00338
00339 self.channels = {}
00340
00341 if jobs:
00342 self.jobs = jobs[:]
00343 else:
00344 self.jobs = []
00345
00346 self.position = 0
00347
00348 def get_input(self, fileno=0):
00349 '''Return (and possibly initialize) input channel.'''
00350
00351 channel = self.channels.get(fileno)
00352 if channel is None:
00353 assert not self.executor.is_alive()
00354
00355 channel = self.executor.redirect_input(fileno)
00356 self.channels[fileno] = channel
00357 else:
00358 assert not channel.output
00359
00360 return channel
00361
00362 def get_output(self, fileno=1):
00363 '''Return (and possibly initialize) output channel.'''
00364
00365 channel = self.channels.get(fileno)
00366 if channel is None:
00367 assert not self.executor.is_alive()
00368
00369 channel = self.executor.redirect_output(fileno)
00370 self.channels[fileno] = channel
00371 else:
00372 assert channel.output
00373
00374 return channel
00375
00376 def get_error(self):
00377 '''Return (and possibly initialize) error output channel.'''
00378
00379 return self.get_output(2)
00380
00381 def add(self, job):
00382 '''Add JOB to be executed in this batch.'''
00383
00384 self.jobs.append(job)
00385
00386 def extend(self, jobs):
00387 '''Add JOBS to be executed in this batch.'''
00388
00389 self.jobs.extend(jobs)
00390
00391 def remove(self, job):
00392 '''Remove JOB from this batch. The batch must not be running.'''
00393
00394 assert not self.executor.is_alive()
00395 self.jobs.remove(job)
00396
00397 def clear(self):
00398 '''Remove all jobs from this batch. The batch must not be running.'''
00399
00400 assert not self.executor.is_alive()
00401 del self.jobs[:]
00402
00403 def rewind(self):
00404 '''Batch position will be reset. The batch must not be running.'''
00405
00406 assert not self.executor.is_alive()
00407 self.position = 0
00408
00409 def start(self):
00410 '''Execute the remaining jobs.'''
00411
00412 while self.position < len(self.jobs):
00413 job = self.jobs[self.position]
00414
00415 if isinstance(job, Job):
00416 self.emit('job-started', job)
00417 return
00418 else:
00419 original = sys.stdout, sys.stderr
00420 try:
00421 sys.stdout = self.get_output()
00422 sys.stderr = self.get_error()
00423
00424 job()
00425 finally:
00426 sys.stdout, sys.stderr = original
00427
00428 self.position += 1
00429
00430 self.emit('finished', 0)
00431
00432 def stop(self):
00433 '''Stop the execution.'''
00434
00435 self.executor.stop()
00436
00437 def is_alive(self):
00438 '''Check if one of the jobs is currently running.'''
00439
00440 return self.executor.is_alive()
00441
00442 def on_started(self, batch, job):
00443 self.executor.start(job)
00444
00445 def on_executor_stopped(self, executor, status):
00446 if status == 0:
00447 self.position += 1
00448 self.start()
00449 else:
00450 self.emit('finished', status)
00451
00452 gobject.type_register(Batch)
00453 gobject.signal_new('job-started', Batch, 0, gobject.TYPE_NONE, [gobject.GObject])
00454 gobject.signal_new('finished', Batch, 0, gobject.TYPE_NONE, [gobject.TYPE_INT])