execution.py

Go to the documentation of this file.
00001 # Copyright 2005, 2006  Timo Savola
00002 #
00003 # This program is free software; you can redistribute it and/or modify
00004 # it under the terms of the GNU Lesser General Public License as published
00005 # by the Free Software Foundation; either version 2.1 of the License, or
00006 # (at your option) any later version.
00007 
00008 '''Asynchronous command execution.  Integrated into the GLib main loop.'''
00009 
00010 import sys
00011 import os
00012 import fcntl
00013 import signal
00014 import gobject
00015 
00016 import sets
00017 set = sets.Set
00018 
00019 ##
00020 ## Spawn
00021 
00022 def spawn(args, vars=None, workdir=None, channels=[]):
00023         assert type(args) == list and len(args) > 0
00024         assert not vars or type(vars) == dict
00025 
00026         pipes = [os.pipe() for entry in channels]
00027 
00028         pid = os.fork()
00029         if pid == 0:
00030                 for fd in xrange(1024):
00031                         try:
00032                                 fcntl.fcntl(fd, fcntl.F_SETFD, fcntl.FD_CLOEXEC)
00033                         except:
00034                                 pass
00035 
00036                 try:
00037                         if vars:
00038                                 os.environ.update(vars)
00039 
00040                         if workdir:
00041                                 os.chdir(workdir)
00042 
00043                         for fileno, channel in channels:
00044                                 fd = pipes.pop(0)
00045 
00046                                 if channel.output:
00047                                         master, slave = fd
00048                                 else:
00049                                         slave, master = fd
00050 
00051                                 os.close(master)
00052                                 if slave != fileno:
00053                                         os.dup2(slave, fileno)
00054                                         os.close(slave)
00055 
00056                         os.execvp(args[0], args)
00057 
00058                 except Exception, e:
00059                         try:
00060                                 print >>sys.stderr, '%s:' % args[0],
00061                         except:
00062                                 pass
00063 
00064                         try:
00065                                 print >>sys.stderr, e
00066                         except:
00067                                 pass
00068 
00069                 os._exit(127)
00070 
00071         for fileno, channel in channels:
00072                 fd = pipes.pop(0)
00073 
00074                 if channel.output:
00075                         master, slave = fd
00076                 else:
00077                         slave, master = fd
00078 
00079                 os.close(slave)
00080                 channel.open(master)
00081 
00082         return pid
00083 
00084 ##
00085 ## Channel
00086 
00087 class Channel (gobject.GObject):
00088         def __init__(self, condition=0, callback=None):
00089                 gobject.GObject.__init__(self)
00090 
00091                 self.file = None
00092                 self.condition = condition
00093                 self.callback = callback
00094 
00095         def open(self, file):
00096                 assert not self.file
00097 
00098                 self.file = file
00099 
00100                 mask = self.condition | gobject.IO_ERR | gobject.IO_HUP
00101                 self.id = gobject.io_add_watch(self.file, mask, self._watch)
00102 
00103                 self.emit('opened')
00104 
00105         def _watch(self, file, condition):
00106                 if condition & gobject.IO_ERR:
00107                         close = True
00108                 elif condition & gobject.IO_HUP:
00109                         if condition & self.condition:
00110                                 self.callback(file, flush=True)
00111 
00112                         close = True
00113                 else:
00114                         if condition & self.condition:
00115                                 self.callback(file, flush=False)
00116 
00117                         close = False
00118 
00119                 if close:
00120                         self.file = None
00121                         self.id = None
00122 
00123                         try:
00124                                 file.close()
00125                         except IOError:
00126                                 pass
00127 
00128                         self.emit('closed')
00129 
00130                         return False
00131                 else:
00132                         return True
00133 
00134         def close(self):
00135                 if self.id:
00136                         try:
00137                                 gobject.remove_source(self.id)
00138                         except:
00139                                 pass
00140 
00141                         try:
00142                                 self.file.close()
00143                         except:
00144                                 pass
00145 
00146                         self.id = None
00147                         self.file = None
00148 
00149                         self.emit('closed')
00150 
00151 gobject.type_register(Channel)
00152 gobject.signal_new('opened', Channel, 0, gobject.TYPE_NONE, [])
00153 gobject.signal_new('closed', Channel, 0, gobject.TYPE_NONE, [])
00154 
00155 class WriteChannel (Channel):
00156         output = False
00157 
00158         def __init__(self):
00159                 Channel.__init__(self)
00160 
00161         def open(self, fd):
00162                 file = os.fdopen(fd, 'w')
00163                 Channel.open(self, file)
00164 
00165         def write(self, data):
00166                 assert self.file
00167                 self.file.write(data)
00168 
00169         def flush(self):
00170                 assert self.file
00171                 self.file.flush()
00172 
00173 gobject.type_register(WriteChannel)
00174 
00175 class ReadChannel (Channel):
00176         output = True
00177 
00178         def __init__(self, consumers=[]):
00179                 Channel.__init__(self, gobject.IO_IN, self._read)
00180                 self.consumers = set(consumers)
00181 
00182         def open(self, fd):
00183                 fcntl.fcntl(fd, fcntl.F_SETFL, os.O_NONBLOCK)
00184                 file = os.fdopen(fd, 'r')
00185                 Channel.open(self, file)
00186 
00187         def _read(self, file, flush):
00188                 while True:
00189                         data = file.read()
00190                         if data:
00191                                 for callback in self.consumers:
00192                                         callback(data)
00193 
00194                         if not flush or not data:
00195                                 break
00196 
00197         def write(self, data):
00198                 for callback in self.consumers:
00199                         callback(data)
00200 
00201 
00202 gobject.type_register(ReadChannel)
00203 
00204 ##
00205 ## Executor
00206 
00207 class Executor (gobject.GObject):
00208         def __init__(self):
00209                 gobject.GObject.__init__(self)
00210 
00211                 self.channels = []
00212                 self.open_channels = set()
00213 
00214                 self.pid = None
00215                 self.watch = None
00216                 self.status = None
00217 
00218         def redirect_input(self, fileno):
00219                 return self.redirect(fileno, WriteChannel())
00220 
00221         def redirect_output(self, fileno):
00222                 return self.redirect(fileno, ReadChannel())
00223 
00224         def redirect(self, fileno, channel):
00225                 channel.connect('opened', self.on_channel_opened)
00226                 channel.connect('closed', self.on_channel_closed)
00227 
00228                 self.channels.append((fileno, channel))
00229                 return channel
00230 
00231         def start(self, job):
00232                 assert not self.is_alive()
00233 
00234                 self.status = None
00235                 self.pid = job(self.channels)
00236                 self.watch = gobject.child_watch_add(self.pid, self.on_child_exited)
00237 
00238         def stop(self):
00239                 try:
00240                         self.kill(signal.SIGTERM)
00241                 except:
00242                         pass
00243 
00244         def kill(self, signal):
00245                 if self.pid is not None:
00246                         os.kill(self.pid, signal)
00247 
00248         def is_alive(self):
00249                 return (self.pid is not None) or self.open_channels
00250 
00251         def on_channel_opened(self, channel):
00252                 self.open_channels.add(channel)
00253 
00254         def on_channel_closed(self, channel):
00255                 self.open_channels.remove(channel)
00256 
00257                 self._maybe_emit_stopped()
00258 
00259         def on_child_exited(self, pid, status):
00260                 self.pid = None
00261                 self.watch = None
00262                 self.status = status
00263 
00264                 self._maybe_emit_stopped()
00265 
00266         def _maybe_emit_stopped(self):
00267                 if (self.pid is None) and (not self.open_channels):
00268                         self.emit('stopped', self.status)
00269 
00270 gobject.type_register(Executor)
00271 gobject.signal_new('stopped', Executor, 0, gobject.TYPE_NONE, [gobject.TYPE_INT])
00272 
00273 ##
00274 ## Job
00275 
00276 class Job (gobject.GObject):
00277         '''Describes command parameters.  The 'args', 'vars' and 'workdir' attributes are publicly
00278            accessible.'''
00279 
00280         def __init__(self, args, vars=None, workdir=None):
00281                 '''ARGS is the argument vector starting with the program name or path.
00282                    VARS is a dictionary of additional environment variables.
00283                    WORKDIR is the current working directory of the command.'''
00284 
00285                 gobject.GObject.__init__(self)
00286 
00287                 self.args = args
00288                 self.vars = vars
00289                 self.workdir = workdir
00290 
00291         def __iter__(self):
00292                 return iter((self.args, self.vars, self.workdir))
00293 
00294         def __repr__(self):
00295                 return '%s(%s, %s, %s)' % (Job.__name__, repr(self.args), repr(self.vars), repr(self.workdir))
00296 
00297         def __call__(self, channels=[]):
00298                 return spawn(self.args, self.vars, self.workdir, channels)
00299 
00300 gobject.type_register(Job)
00301 
00302 ##
00303 ## Hook
00304 
00305 class Hook (gobject.GObject):
00306         def __init__(self, func):
00307                 gobject.GObject.__init__(self)
00308                 self.func = func
00309 
00310         def __repr__(self):
00311                 return '%s(%s)' % (Job.__name__, repr(self.func))
00312 
00313         def __str__(self):
00314                 return self.func.func_name
00315 
00316         def __call__(self):
00317                 return self.func()
00318 
00319 gobject.type_register(Hook)
00320 
00321 ##
00322 ## Batch
00323 
00324 class Batch (gobject.GObject):
00325         '''Sequence of jobs that can be monitored via signals.  The "job-started" signal is emitted
00326            when a new job starts executing (callbacks are invoked with the Job as a parameter).
00327            "finished" is emitted when a job fails or the whole batch finishes (callbacks are
00328            invoked with the exit status as a parameter).'''
00329 
00330         def __init__(self, jobs=None):
00331                 '''Initialise an empty batch.'''
00332 
00333                 gobject.GObject.__init__(self)
00334                 self.connect('job-started', self.on_started)
00335 
00336                 self.executor = Executor()
00337                 self.executor.connect('stopped', self.on_executor_stopped)
00338 
00339                 self.channels = {}
00340 
00341                 if jobs:
00342                         self.jobs = jobs[:]
00343                 else:
00344                         self.jobs = []
00345 
00346                 self.position = 0
00347 
00348         def get_input(self, fileno=0):
00349                 '''Return (and possibly initialize) input channel.'''
00350 
00351                 channel = self.channels.get(fileno)
00352                 if channel is None:
00353                         assert not self.executor.is_alive()
00354 
00355                         channel = self.executor.redirect_input(fileno)
00356                         self.channels[fileno] = channel
00357                 else:
00358                         assert not channel.output
00359 
00360                 return channel
00361 
00362         def get_output(self, fileno=1):
00363                 '''Return (and possibly initialize) output channel.'''
00364 
00365                 channel = self.channels.get(fileno)
00366                 if channel is None:
00367                         assert not self.executor.is_alive()
00368 
00369                         channel = self.executor.redirect_output(fileno)
00370                         self.channels[fileno] = channel
00371                 else:
00372                         assert channel.output
00373 
00374                 return channel
00375 
00376         def get_error(self):
00377                 '''Return (and possibly initialize) error output channel.'''
00378 
00379                 return self.get_output(2)
00380 
00381         def add(self, job):
00382                 '''Add JOB to be executed in this batch.'''
00383 
00384                 self.jobs.append(job)
00385 
00386         def extend(self, jobs):
00387                 '''Add JOBS to be executed in this batch.'''
00388 
00389                 self.jobs.extend(jobs)
00390 
00391         def remove(self, job):
00392                 '''Remove JOB from this batch.  The batch must not be running.'''
00393 
00394                 assert not self.executor.is_alive()
00395                 self.jobs.remove(job)
00396 
00397         def clear(self):
00398                 '''Remove all jobs from this batch.  The batch must not be running.'''
00399 
00400                 assert not self.executor.is_alive()
00401                 del self.jobs[:]
00402 
00403         def rewind(self):
00404                 '''Batch position will be reset.  The batch must not be running.'''
00405 
00406                 assert not self.executor.is_alive()
00407                 self.position = 0
00408 
00409         def start(self):
00410                 '''Execute the remaining jobs.'''
00411 
00412                 while self.position < len(self.jobs):
00413                         job = self.jobs[self.position]
00414 
00415                         if isinstance(job, Job):
00416                                 self.emit('job-started', job)
00417                                 return
00418                         else:
00419                                 original = sys.stdout, sys.stderr
00420                                 try:
00421                                         sys.stdout = self.get_output()
00422                                         sys.stderr = self.get_error()
00423 
00424                                         job()
00425                                 finally:
00426                                         sys.stdout, sys.stderr = original
00427 
00428                                 self.position += 1
00429 
00430                 self.emit('finished', 0)
00431 
00432         def stop(self):
00433                 '''Stop the execution.'''
00434 
00435                 self.executor.stop()
00436 
00437         def is_alive(self):
00438                 '''Check if one of the jobs is currently running.'''
00439 
00440                 return self.executor.is_alive()
00441 
00442         def on_started(self, batch, job):
00443                 self.executor.start(job)
00444 
00445         def on_executor_stopped(self, executor, status):
00446                 if status == 0:
00447                         self.position += 1
00448                         self.start()
00449                 else:
00450                         self.emit('finished', status)
00451 
00452 gobject.type_register(Batch)
00453 gobject.signal_new('job-started', Batch, 0, gobject.TYPE_NONE, [gobject.GObject])
00454 gobject.signal_new('finished', Batch, 0, gobject.TYPE_NONE, [gobject.TYPE_INT])

Generated on Thu Jan 18 09:47:40 2007 for Encode by  doxygen 1.4.7