[PEAK] Reactor-driven microthreads

Phillip J. Eby pje at telecommunity.com
Wed Dec 31 15:34:03 EST 2003


At 01:12 PM 12/31/03 -0500, Phillip J. Eby wrote:
>For clarity, I've omitted the part where the generator in readlines() 
>accumulates the data, pushes back partial lines, etc.  As you can see, the 
>only new concept we need is a "queue", or perhaps we should call it a 
>"pipe".  Actually, the concept is probably used often enough that it would 
>be easier to do:
>
>     def readlines(self):
>         def genLines():
>             # ... Loop to accumulate a line
>             queue.put(line)
>         return Queue(self, run=genLines())

Or, even better, from an API perspective, we could define a 'queued' 
wrapper as follows:

from peak.util.advice import advice

class queued(advice):

     __slots__ = ()

     def __call__(__advice, self, *__args, **__kw):
         queue = Queue(self)
         Thread(self).run(
             __advice._func(self, queue, *__args, **__kw)
         )
         return queue


and then do:

     def readlines(self,queue):
         # ... Loop to accumulate a line, yielding as needed
         queue.put(line)

     readlines = queued(readlines)

The idea here is that 'queued' generator methods of components now do the 
right thing, running in a new "thread" and receiving an extra argument for 
the queue they communicate their results through.  The 'queue' argument 
isn't seen or passed by the caller, any more than one normally passes 
'self'.  Admittedly, this convention could be confusing.  An alternative 
would be to pass the queue *first*, so that queued methods are defined like 
this:

     def readlines(queue, self, ...):
        # ...

This might provide a visible hint that something different is happening 
here.  The advice wrapper could also be extended to check that the wrapped 
function defined 'self' as the second parameter at wrapping time.

Thoughts, anyone?

Actually, while on the subject of thoughts...  any ideas what to *call* 
this?  I'd like to have it be convenient to say 'foo.Thread', 'foo.Queue', 
etc. in the same way that we have binding, naming, running, logs, 
etc.  (Although in a4 'logs' will probably go away as an "API" framework, 
because it no longer exposes even constants as an API.).

Anyway, I don't want to call it 'thread', 'threads', or 'threading', to 
avoid confusion with similar Python modules or with "real" 
threads.  'pthreads' (for pseudothreads) is out due to likely confusion 
with the popular C-level 'pthread' package.  I had been thinking 'mthread' 
(for microthreads), but threads aren't even really all that central of a 
concept here.  This is almost like "coroutines" or "co-operative 
multitasking" or "event-driven progamming".

So, maybe it could be 'coop.Thread', 'coop.Queue', etc., except those look 
to me more like "coop" (as in chicken coop) than like co-op.  'multi' seems 
too generic.  Remaining candidates:

task
tasks
tasking
events

These are all pretty generic, though, and perhaps imply greater generality 
than intended.  'events.Queue', 'events.Thread', and 'events.Sleep' all 
read very nicely, although again they seem to give the package greater 
scope than originally intended.  That is, at some point PEAK is likely to 
have various other kinds of "events" besides these.  OTOH, one of the main 
candidates for other kinds of events is GUI support, and this kind of 
"threading" is just as useful/relevant in such an environment.   Indeed, 
anything that needs to respond to a sequence of "events" is likely to be 
more easily expressed as a thread.

Any objections to adding 'peak.events' as a framework API?  That is, 'from 
peak.api import *' would include 'events', but 'from peak.core import *' 
would not.  The initial API it provides would probably include:

events.IThread
events.IEventSource (formerly IThreadScheduler)

events.Thread (implements IThread)

Event sources (implementing IEventSource):
     events.Sleep(secs)      (triggers after seconds)
     events.Pause            (==Sleep(0))
     events.AnyOf(*args)     (triggers when any of its args occur)
     events.Sequence(*args)  (triggers after all of its args occur, in 
sequence)
     events.Readable(fileno) (triggers when file is readable)
     events.Writable(fileno) (triggers when file is writable)
     events.Queue            (triggers when queue contains data)
     events.Value            (triggers when value is set/changed)
     events.Thread           (triggers when thread is finished running)

(and later, we might add GUI-based event sources as well)

resume()  (I'm actually tempted to make this a primitive or at least export 
it from peak.api)


A lot of the work I've done lately on processes, timers, and loggers is 
quite related; I expect that I could refactor many of their existing 
"listener" mechanisms to use queues and event sources.  Thus, measuring a 
duration or invoking a log can be an event source that prompts threads to 
be resumed.  (Open issue: how to control/tell whether a given thread should 
be resumed in the current or following reactor cycle, without adding an 
explicit Pause.)

(You know, if Python could only pickle generator-iterators, we could 
actually implement persistent workflows with this thing...  Hm.  Maybe with 
a little help from C...)

It's almost sad how much work I put into the process supervisor system; so 
many of its most convoluted aspects would be more cleanly represented as 
threads.  Heck, the entire peak.running.daemons scheduling system might be 
better represented as threads that put themselves in the task queue after 
sleeping between iterations.  The task queue would be monitored by a thread 
that then resumed whichever waiting thread had the highest priority.  And 
the whole thing would be a lot clearer, although the lack of try-finally 
might be occasionally inconvenient.

*Very* interesting, indeed.




More information about the PEAK mailing list