[PEAK] trellis activity does not save/restore Contextual context
between task switches
Andrew Svetlov
andrew.svetlov at gmail.com
Sat Mar 21 17:35:41 EDT 2009
As I fugured out trellis activity tasks does not switching context
inside task working loop. It leads to situation what if my tasks uses
something like context.new() in own code I cannot run two of them
together.
I made patch for TaskCell._stepper to support this feature. Main
change is: STACK now contains not only current generators for tasks
but also last context.State for every generator and updates states
when need.
--------------------------------------------------
from peak.events import trellis
from peak.events.activity import *
import sys
from peak import context
def _stepper(self, func):
VALUE = self._result = []
ERROR = self._error = []
STACK = [[func(), context.State.child()]]
CALL = STACK.append
RETURN = STACK.pop
ctrl = trellis.ctrl
def _step():
ctx = STACK[-1][1]
outer = ctx.swap()
while STACK:
try:
it = STACK[-1][0]
if VALUE and hasattr(it, 'send'):
rv = it.send(VALUE[0])
elif ERROR and hasattr(it, 'throw'):
rv = it.throw(*ERROR.pop())
else:
rv = it.next()
except:
del VALUE[:]
ERROR.append(sys.exc_info())
if ERROR[-1][0] is StopIteration:
ERROR.pop() # not really an error
RETURN()
else:
del VALUE[:]
if rv is Pause:
STACK[-1][1] = context.State.get()
break
elif hasattr(rv, 'next'):
CALL([rv, context.State.get()]); continue
elif isinstance(rv, Return):
rv = rv.value
VALUE.append(rv)
if len(STACK)==1: break
RETURN()
if STACK and not ERROR and not ctrl.reads:
ctrl.current_listener.dirty() # re-run if still running
outer.swap()
return resume()
return _step
def apply():
TaskCell._stepper = _stepper
----------------------------------
Unittests covers this situation. Maybe I don't check all possible
cases but my tests reflects my current understanding of problem.
---------------------------------
from __future__ import with_statement
'''
Created on Mar 20, 2009
@author: asvetlov
'''
from peak.events import trellis, activity
from peak import context
import unittest
from fds_main.utilities.trellis import monkeypatch #monkeypatch required
monkeypatch.apply()
@context.setting
def val(value=1):
return int(value)
class Workflow(trellis.Component):
done1 = trellis.attr(False)
done2 = trellis.attr(False)
done3 = trellis.attr(False)
ret1 = trellis.make(list)
ret2 = trellis.make(list)
ret3 = trellis.make(list)
@activity.task
def do1(self):
global val
self.ret1.append(val())
try:
with context.new():
val <<= 2
self.ret1.append(val())
yield activity.Pause
self.ret1.append(val())
finally:
self.done1 = True
@activity.task
def do2(self):
global val
self.ret2.append(val())
try:
with context.new():
val <<= 3
self.ret2.append(val())
yield activity.Pause
self.ret2.append(val())
finally:
self.done2 = True
def nested(self):
global val
self.ret3.append(val())
yield activity.Pause
self.ret3.append(val())
with context.new():
val <<= 5
yield activity.Pause
self.ret3.append(val())
yield activity.Pause
self.ret3.append(val())
yield 100
@activity.task
def do3(self):
global val
self.ret3.append(val())
try:
with context.new():
val <<= 4
self.ret3.append(val())
yield activity.Pause
self.ret3.append(val())
a = yield self.nested()
assert a==100
self.ret3.append(val())
yield activity.Pause
self.ret3.append(val())
yield activity.Pause
self.ret3.append(val())
finally:
self.done3 = True
class TestParallelTasks(unittest.TestCase):
def test_default_context(self):
self.assertEquals(1, val())
def test_parallel(self):
w = Workflow()
while not w.done1 or not w.done2:
activity.EventLoop.flush()
self.assertEquals([1, 2, 2], w.ret1)
self.assertEquals([1, 3, 3], w.ret2)
def test_capture_nondefault_context_and_restores_after_end_task(self):
global val
with context.new():
val <<= 10
self.assertEquals(10, val())
w = Workflow()
while not w.done1 or not w.done2:
activity.EventLoop.flush()
self.assertEquals([10, 2, 2], w.ret1)
self.assertEquals([10, 3, 3], w.ret2)
self.assertEquals(1, val())
def test_check_switch_context_in_subtasks(self):
global val
with context.new():
val <<= 10
self.assertEquals(10, val())
w = Workflow()
while not w.done1 or not w.done2 or not w.done3:
self.assertEquals(10, val())
activity.EventLoop.flush()
self.assertEquals([10, 2, 2], w.ret1)
self.assertEquals([10, 3, 3], w.ret2)
self.assertEquals([10, 4, 4, 4, 4, 5, 4, 4, 4, 10], w.ret3)
if __name__ == '__main__':
unittest.main()
-------------------------------------
Please review and if this patch is clean - put it into current trellis trunk.
If not - please point where I'm wrong.
More information about the PEAK
mailing list