[PEAK] Twisted Deferred as EventSources & Generators

Phillip J. Eby pje at telecommunity.com
Wed Jan 28 16:17:31 EST 2004


At 12:27 PM 1/28/04 -0800, Chad Rosenberg wrote:
>I've run into the problem where I need to wait for the results of a 
>Deferred while in _run of an AbstractCommand.  I noticed the following 
>doesn't work:
>
>     eventLoop = binding.Obtain(IEventSource)
>
>     def doSomeWork(self):
>         d = self.somePBCall()
>         result = self.eventLoop.runUntil(d)
>
>
>The above gives me:
>
>     ("Can't adapt", <Deferred at 0x1038af8>, <class 
> 'peak.events.interfaces.IEventSource'>)
>
>So I took a look at the adapter DeferredAsEventSource and noticed that it 
>only adapts a Deferred to ITaskSwitch, and not IEventSource (as the name 
>would seem to imply).   So, I suppose my question is: how does one obtain 
>the results of a deferred outside a generator/event.threaded function?

A simple way might be:

def deferredAsBroadcaster(deferred):
     data = events.Broadcaster()
     deferred.addCallback(lambda v: [data.send(v),v][1])
     return data

result = self.eventLoop.runUntil( deferredAsBroadcaster( self.somePBCall() ) )

This will return an event source you can use with runUntil.  Note that you 
may receive a Failure instance as your 'result' from Twisted if the 
deferred gets an error.

Of course, you can use a similar approach to hook up a Distributor, Value, 
or Condition instead of a Broadcaster, depending on your needs.


>I was wondering, if something like the following wouldn't be useful:
>
>     def doSomeWork(self):
>         d = self.getPBData()
>         yield d; result = events.resume()
>
>         if result == 'something bad':
>             raise Exception, 'bad'
>
>         yield events.exit(result)

Actually, I'm considering making Threads be event sources that fire their 
final (non task-switching) yield value as an output.  So 'yield result' 
would be sufficient in that case.


>     doSomeWork = events.waitFor(doSomeWork)
>
>     def _run(self):
>         try:
>             print self.doSomeWork()
>         except Exception, reason:
>             print reason
>
>Where event.waitFor would wrap a Thread and provide a blocking call.
>That way, as far as the caller is concerned, doSomeWork looks like any 
>other blocking method with a return value, and doSomeWork gets to use 
>events nice and cleanly.  Or is there already a way to do this that I have 
>missed?

I'd prefer to keep sync/async transitions both very explicit and very rare, 
so for now I'm not going to make synchronizing async code any 
easier.  There are a lot of issues you need to get right in doing that, 
relative to your overall process model, and it's way too early (in terms of 
our experience so far with these boundaries) to define what the "normal" 
way to handle them is.  You'll notice that 'runUntil()' has a bunch of 
parameters that are partly there to smooth over Twisted-vs-peak.events 
differences, and partly there to deal with various needs of existing 
code.  It's entirely possible that it will grow a few more parameters in 
the future.  So a 'waitFor()' function as you suggest is not really 
feasible without adding all those arguments into it.  (There's a further 
issue of deciding what event loop to run, as well.)





More information about the PEAK mailing list