[PEAK] Trellis API sketches, part 1

Phillip J. Eby pje at telecommunity.com
Fri Jul 6 17:05:34 EDT 2007

For about the last 24 hours, I've been feverishly trying to balance 
out a host of design constraints on both the API and the 
implementation of the coming Trellis package.  I wanted to bypass 
some of the limitations of the previous designs of both the Trellis 
and its inspirations (such as Cells and PyCells).

In particular, one of my day-job requirements is that cells be able 
to have both rules *and* values in the same cell, and handle circular 
dependencies.  For example::

     class ScheduleItem(trellis.Component):

             startTime = None,
             duration  = timedelta(minutes=30),
             endTime   = None,

             startTime = lambda self: datetime.now(),
             duration  = lambda self: self.endTime - self.startTime,
             endTime   = lambda self: self.startTime + self.duration,

The idea here is that if you change any one of the item's start time, 
end time, or duration, the other values will update themselves 
accordingly, as will anything that depends on them, recursively -- 
including UI views, if applicable.  When you initially create a 
ScheduleItem, its startTime will be "now", and it will have a default 
duration of 30 minutes.  However, you can override any of these 
initial values by passing in keyword arguments.

All attributes with rules are immediately brought up to date when an 
object is created, right after their keyword argument values have 
been assigned.  This is a bit different from peak.binding's approach, 
where attributes are lazy by default, and can be marked for setup 
"upon assembly" of a component hierarchy.  Here, attributes are eager 
*unless* they are explicitly marked "optional".

(Optional attributes aren't forced to initialization when an object 
is created, so this allows saving some memory or recalculation 
time.  It's just an optimization feature, though.)

The real magic of the Trellis, however, isn't visible from the above 
simple example.  The main features of the Trellis are that it:

1. Prevents race conditions in event-driven code, by ensuring that 
(a) whenever you read a value, it's always "up to date", (b) whenever 
you change a value, the new value doesn't take effect until everyone 
who depends on the *previous* value has had a chance to read it, and 
(c) if more than one event handler changes the same value, they are 
required to change it in exactly the same way (thereby enforcing 
order independence among handlers).

2. Eliminates the need for callback management, by automatically 
detecting when a value is read, and thus automatically tracking 
dependencies.  Recalculation occurs only when at least one dependency 
has actually changed its value, and is guaranteed to occur for the 
absolute minimum possible subset of the dependency graph reachable 
from any changed values.

The combination of these two features makes event-driven code *much* 
simpler than callback-based code, not only because you don't 
explicitly manage callbacks, but also because it's not necessary to 
manage the sequence of events.  This is an advantage similar to the 
advantage Python's refcounting/garbage collection provides over the 
explicit memory management of C or C++.

For example, if you look at the code in peak.running.process, you'll 
see some fairly complex code that tries to "simultaneously" update a 
number of observable values, based on a subprocess's exit code, if 
any.  It's complicated because it has to first disable all of the 
events that might be affected, then re-enable them afterwards.  All 
this is just to prevent code that's paying attention to the values 
seeing inconsistencies.

Comparable code for a Trellis-based version is considerably more 
succinct, and far less tricky, however::

class Process(trellis.Component):

     last_check = NOT_YET

         min_life = 10,       # 10 seconds minimum lifetime
         poll_every = 30,     # check for exit every 30 seconds
         busy = False,
         idle = NOT_YET,
         exited = None,
         pid = None,
         child_status = None,
         started = NOT_YET,

         active = lambda self:
             not self.exited and not self.stopped,

         exited = lambda self:
             self.child_status and os.WIFEXITED(self.child_status),

         exitStatus = lambda self:
             self.exited and os.WEXITSTATUS(self.child_status),

         exitedForSignal = lambda self:
             self.child_status and os.WIFSIGNALED(self.child_status) and

         stopped = lambda self:
             self.child_status and os.WIFSTOPPED(self.child_status),

         stoppedForSignal = lambda self:
             self.stopped and os.WSTOPSIG(self.child_status),

         started = lambda self: self.started | self.pid,

         idle   = lambda self:
             self.idle | not self.busy and self.started(self.min_life),

     def pid(self):
         if self.pid is None:
             # code to start the process and return a pid
             return self.pid

     def child_status(self):
         if self.child_status is not None:
             return self.child_status
         if (Signals.SIGCHLD | self.last_check(self.poll_every)):
             self.last_check = Time.now
             while True:
                     p, s = os.waitpid(self.pid, self.os.WNOHANG)
                 except OSError,v:
                     if v.args[0]==errno.ECHILD:
                         return None
                     elif v.args[0]==errno.EINTR:
                         continue    # retry
                         raise  #log.exception("Unexpected error in waitpid()")
                     if p==self.pid:
                         return s
                     return None

The special NOT_YET value, Time.now, and the '|' operator deserve 
some explanation here.  Time.now returns a timestamp object 
representing the current time being seen by all rules at the 
moment.  Timestamps can be called with an argument, to determine 
whether it has been at least that many seconds since the 
timestamp.  Timestamps can be combined with true or false values 
using the '|' operator.  A true value causes the same timestamp to be 
returned, while a false value returns NOT_YET.

NOT_YET is a special timestamp object.  When called, it always 
returns False.  When combined with a true value using '|', it returns 
a timestamp equal to Time.now.

The net result is that timestamps let you keep track of how long it 
has been since some event.  You can either explicitly track them (as 
is done by the 'child_status' rule setting last_checked to Time.now), 
or you can use them to monitor conditions (as is done by the 'idle' rule).

Notice, too, that the 'idle' rule references 'self.idle' -- when a 
rule refers to itself (directly or indirectly), it will see its 
*previous* value.  So, the 'idle' rule basically says, "keep track of 
the most recent moment at which we began being not busy, as long as 
that moment is after we've been started for min_life seconds".

Thus, rules that want to know whether a process has been idle a 
certain amount of time can use 'if process.idle(timeout):'.  Such 
rules will be recalculated whenever the process's 'busy' status 
changes, or when it has been continuously idle for the requested time period.

The 'child_status' rule also uses the '|' operator, but for a 
different purpose.  It's checking whether a signal has fired, or 
whether it's been poll_interval since the last attempt at checking 
the exit status.  It checks them using '|' instead of 'or', in order 
to *not* shortcut the calculation. Otherwise, if only the first 
condition were true, the second condition would not be checked, and 
they both need to be in order to ensure that the rule will still 
depend on both conditions.

The 'busy' attribute doesn't have any rule defined, so it will only 
change if it's explicitly set.  However, you could easily define a 
rule for it in a subclass, if desired::

     def busy(self):
         if self.stream is not None and IO.readable(self.stream):
                 byte = self.stream.read(1)
             except ValueError:
                 # already closed, get rid of it
                 self.stream = None
                 if byte=='+': return True
                 if byte=='-': return False

         # no change in current status
         return self.busy

The above implementation basically duplicates a similar feature in 
peak.tools.supervisor, but with a bit less code and a LOT less complexity.

With these features, we could now implement a lightweight version of 
the supervisor tool's process maintenance features::

class ProcessMonitor(trellis.Component):
         processes = (),
         timeout = 60,
         start_if_busy_for = 5,
         min_processes = 1,
         max_processes = 5,
         all_busy_for = NOT_YET,
         desired_processes = 0,
         start_delay  = 15,
         last_started = None,
         active_processes = lambda self:
             tuple(p for p in self.processes if p.active),

         idle_processes = lambda self:
             tuple(p for p in self.active_processes if p.idle(self.timeout)),

         all_busy_for = lambda self:
             self.all_busy_for | all(p.busy for p in self.active_processes),

     def desired_processes(self):
         desired = self.desired_processes or self.min_processes
         desired += self.all_busy_for(self.start_if_busy_for)
         desired -= len(self.idle_processes)
         desired = min(desired, self.max_processes)
         desired = max(desired, self.min_processes)
         return desired

     def processes(self):
         active = self.active_processes
         to_kill = len(active) - self.desired_processes
         if to_kill > 0:
             # kill the excess
             for p in self.idle_processes[:to_kill]
         elif to_kill < 0:
             if not self.last_started or 
                 active += (Process(...),)
                 self.last_started = Time.now
         return active

The above code automatically starts enough processes to ensure that 
there are min_processes running at all times, but never more than 
max_processes.  Between min and max, the current number of processes 
will grow by one whenever all processes have been continuously busy 
for a set period of time.  All processes that have been idle for a 
certain amount of time will be targeted for immediate shutdown, up to 
the number that would cause the number of processes to fall below 
min_processes.  No matter what, there is a minimum delay of 
start_delay between process launches, even if the launched processes 
are immediately exiting.

I believe this supports all the features of peak.tools.supervisor's 
core process monitoring, but in a heck of a lot less code and 
complexity than the corresponding portions of the 
original.  Actually, it supports slightly more, in that it handles 
signalling idle children to die off, although it will admittedly send 
them an awful lot of signals, unless Process.kill() implements some 
sort of delay, e.g.:

     last_kill = None
     kill_retry = 5

     def kill(self):
         if not self.last_kill or self.last_kill(self.kill_retry):
             self.last_kill = Time.now
             # send self.pid a signal here

Anyway, this whole implementation is virtually self-evident as to 
intention when compared to the peak.tools.supervisor and 
peak.running.process equivalents, which are based on peak.events and 
must do a lot of callback/event management instead of just specifying 
intentional rules.

So, this is a pretty good set of candidate examples for some basic 
temporal rule handling and simple value management.  In later emails, 
I'll present some motivating examples for various additional API 
features, and sketch some algorithmic details of the plans for implementation.

More information about the PEAK mailing list