[PEAK] Reactor-driven microthreads
Phillip J. Eby
pje at telecommunity.com
Wed Dec 31 09:38:05 EST 2003
At Wed, 31 Dec 2003 00:22:11 -0500, I wrote:
>Well, it's getting awfully late, so I think I'll wait for another day to
>sort out all the details of exception handling. But it sounds like the
>only *really* open issue here is how schedulers' errors can be passed back
>into the generators in a relatively transparent fashion. Probably, that
>means that we'd end up with statements like:
>
> yield thread(until_something_changed)
>
>or:
>
> yield until_something_changed(thread)
>
>as either of these would let the thread object invoke the scheduler (or
>vice versa) while still "in" the generator's execution. So any errors
>would be thrown in the original context. Then, only errors occurring when
>the thread is "resumed" would have to be checked for inside the generator.
Eureka!!! As I was going to sleep, I figured out how to pass errors *into*
a generator. See below:
#=====
from __future__ import generators
import sys
no_exc = None, None, None
import traceback
def resume():
if sys.exc_info()<>no_exc:
try:
t,v,tb = sys.exc_info()
raise t,v,tb
finally:
t=v=tb=None
class Thread(object):
def __init__(self):
self.stack = []
self.call = self.stack.append
def run(self,iterator):
self.call(iterator)
self.step()
def step(self):
stack = self.stack
while stack:
try:
for scheduler in stack[-1]:
if scheduler is not None:
self.call(scheduler)
break
else:
# not scheduled, stop
return
else:
stack.pop()
except:
stack.pop()
if not stack:
raise
def finish(self):
while self.stack:
self.step()
def gen1():
try:
print "calling gen2"
yield gen2(); resume()
except:
print "caught",sys.exc_info()
traceback.print_exception(*sys.exc_info())
try:
print "calling gen2"
yield gen2(); resume()
except:
print "caught",sys.exc_info()
traceback.print_exception(*sys.exc_info())
try:
print "calling gen4"
yield gen4(); resume()
except:
print "caught",sys.exc_info()
traceback.print_exception(*sys.exc_info())
print "back from gen4"
try:
print "calling gen4"
yield gen4(); resume()
except:
print "caught",sys.exc_info()
traceback.print_exception(*sys.exc_info())
print "back from gen4"
def gen2():
yield gen3()
def gen3():
raise "foo"
yield None; resume()
def gen4():
yield None; resume()
print; print
t = Thread()
t.run(gen1())
# Simulate being re-run by a scheduler until done...
t.finish()
#=====
As you can see, by simply calling 'resume()' after each 'yield', errors
thrown outside the generator can be passed back "in" to it. The tracebacks
shown for these errors looks something like:
Traceback (most recent call last):
File "C:\cygwin\home\pje\PEAK\src\threadexc.py", line 64, in gen1
yield gen2(); resume()
File "C:\cygwin\home\pje\PEAK\src\threadexc.py", line 36, in step
for scheduler in stack[-1]:
File "C:\cygwin\home\pje\PEAK\src\threadexc.py", line 87, in gen3
raise "foo"
foo
That is, it looks from the traceback as though the generator where the
error occurred, was called directly from the generator that caught the
error. Unfortunately, this leaves out how you got from one to the
other. On the bright side, it makes the traceback shorter and easier to
read. :) And, it's really just amazing that you can do this at all.
The cool thing is that sys.exc_info() is thread-specific, making this
behave correctly. And, there's only a few lines that need to be changed to
enable the full range of scheduling options from my last e-mail, something
like:
for scheduler in stack[-1]:
if scheduler is not None:
if not
adapt(scheduler,IThreadScheduler).schedule(self):
# Scheduler isn't calling us back, so keep going
break
# done this iteration
return
Then, the implementation of IThreadScheduler.schedule(thread) for a
generator will be 'thread.call(theGenerator); return False'. For 'sleep',
it'd be something like 'reactor.callLater(seconds,thread.step); return
True'. Etc.
Microthreads, here we come!
More information about the PEAK
mailing list