[PEAK] Sync, async, loops, and threads
Phillip J. Eby
pje at telecommunity.com
Sun Jan 18 18:34:24 EST 2004
Most Python code (outside of Twisted) is synchronous. It expects to go
from point A to point B without interruption. Synchronous code can be made
asynchronous (from the point of view of its caller) in a couple of ways:
1) run it in a "real" (i.e. preemptive) thread
2) rewrite it from top to bottom (ugh)
Sometimes, synchronous code would like to use asynchronous code. However,
by its nature, asynchronous code does not return control flow
immediately. Therefore, synchronous code must wait for asynchronous code,
effectively by looping until it is completed.
Like any Python app, a PEAK application starts up running
synchronously. If one is using EventDriven or a more directly
reactor-driven style, then the synchronous code transfers control to
asynchronous code, by running an event loop until the asynchronous code is
finished.
Within that asynchronous code, we sometimes need to call synchronous code,
perhaps complex application/database code. This sort of code is more
convenient to write synchronously, because it doesn't need to respond to
arbitrary events. In general, that sort of code uses blocking I/O (e.g. to
database servers). But, sometimes it needs to use I/O services that were
written for asynchronous I/O. So, again there needs to be a way to wait.
In a sense, one might say that there are two fundamental operations here:
result = synchronize(eventSource)
eventSource = asynchronize(synchronousFunction, *args, **kw)
To 'synchronize()' is to wait for the eventSource to fire, returning the
event sent. To 'asynchronize()' is to run 'synchronousFunction' in a
"real" thread, returning an event source that will fire to provide the
return value (or exception) from the function.
It begins to appear to me as though these concepts make the most sense if
we view synchronous and asynchronous code as always being in different
"real" threads. The main thread starts out synchronous, then runs an async
event loop. Synchronous code is run in another thread, or possibly
multiple threads. This is more or less the model used by Zope's ZServer
and by Twisted: the classic "half async" pattern.
The downside: I'm very uncomfortable with using "real" threads. Their
behavior can vary subtly from platform to platform. Race conditions are
easy to create, resulting in serious-but-hard-to-find bugs. So, I don't
really want to force every program to use them, even if it's just one
"main" thread and one "asynchronizing" thread.
For example, our CGI/FastCGI runner is currently half-async without using
threads. It simply pauses all asynchronous activity while responding to a
web hit. But if we need to use asynchronous I/O *within* a running FastCGI
hit, we don't want to resume accepting connections, though!
It seems that the sensible thing to do is *not* to reuse the same event
loop from within itself. I had been thinking that we could/should run
scheduler.tick() from inside the synchronous code in order to make I/O
occur. But this is actually wrong. We want *another* scheduler, for our
nested I/O activity. And another ISelector, and so on. In other words,
such code wants to run in another service area from the synchronous
application code, or at least in a context that contains its own, distinct
(and non-Twisted) event loop.
Now, as Bob Ippolito has pointed out, this structure isn't the right way to
sync/async a GUI application. GUIs simply can't afford to block, in the
general case. So, you really *have* to use threads in that situation,
unless the platform has ways to let you work around it. Thus, there you'll
want a Twisted-style threadpool (Twisted's 'deferToThread' is similar to my
'asynchronize' operation above).
In that model, synchronous code running in a thread can always delegate
async operations back to the main thread, using the equivalent of
reactor.callLater, setting them up with a callback to release a mutex when
the operation is done, and then doing a blocking acquire() on the
mutex. This would make the synchronous code effectively sleep with no CPU
usage until the asynchronous event occurred.
In either approach to implementing 'synchronize', there is a question of
how to get the asynchronous code to use the right scheduler or other
components. For example, in the multithreaded approach, the asynchronous
code needs to use the services provided by the main thread's event loop,
whereas the single-threaded approach requires the use of a new set of
services. This seems to create a coupling between the precise application
model used, and the code.
But wait... why do we need to delegate async operations back to the main
thread in the multithreaded model? If we're in another thread, why do we
need to have the main thread handle I/O? It seems that if we always run a
nested (non-GUI) event loop when we need to synchronize asynchronous code,
we're okay. The synchronous code will work regardless of whether it's in
the main thread or some other thread. Now that's more like it!
So, we still have the problem of getting these nested I/O operations to use
alternate event-based services. I'm thinking that it will be easiest to
manage those services if they all go through a common component: a kind of
"service area" for event-based services. (By the way, by "event-based
services", I mean things like IScheduler, ISelector, etc.)
This service area-like component would be used to launch threads, so
threads could know how to find these services. Indeed, it seems that
rather than directly using component context for these services, we should
instead be using thread-based context. In other words, there should be
something like an 'events.getService(key)' function that locates
thread-specific services. As a side effect of this, we could move to using
functions like 'events.sleep()' in place of referring to a scheduler object
explicitly, since the 'events.sleep()' function could either call
events.getService(IScheduler), or return an object that looked up the
service when needed.
But after giving that some thought, it seems a little too specialized, and
I'm instead going to revisit the "service area" concept to see if there's a
way to do it in a fashion that's a little more fine-grained. (That will be
a different post, though.)
I also need to flesh out the event loop concept more. Specifically, the
open issues have to do with error handling, and devising meaningful
equivalents of run()/crash()/stop(). Mainly, I don't really care for the
aspect of the reactor model that lets you arbitrarily stop it from
inside. That seems quite wrong to me; the application should control the
reactor, not the other way around. So, I want to move to a style like
'eventLoop.runUntil(condition)' (aka 'synchronize(condition)'). If you
want multiple ways to stop the event loop, then you can supply an
'events.AnyOf()' listing them, or raise an error of the appropriate kind.
I also don't like that the reactor forcibly traps errors; I'd prefer our
'eventLoop.runUntil()' to not silence errors unless explicitly
silenced. The current EventDriven command would silence them, of
course. But a nested async loop inside of synchronous code would not,
since it would want to know there was an error.
Currently, IMainLoop acts as an activity timeout and total run-length
manager. This is handled using a function that controls the run. It
would be nice to be able to convert this to something usable as a parameter
for runUntil. But I'm not yet clear how that could/should be done, as it
would still rely upon a central "activity monitor", a sort of bell for
tasks to ring when things happen. I wonder if this is really a valuable
standalone piece, or whether it should just be made part of EventDriven and
let other application uses set up monitoring threads of their own
design/choosing. Certainly a nested event loop doesn't need anything of
this sort.
I guess this means that IMainLoop should actually stay much the same as it
is now, in interface terms, but it will just be one kind of wrapper over
'eventLoop.runUntil()'. And in time, we'll phase away from having
components directly call 'activityOccurred()', in favor of having them
issue events that an application can indirectly route to call
activityOccurred(), so that it's easier to control what actually
constitutes "activity occurring". (Right now, our current mechanism
hardcodes this policy into a variety of components.)
Once all this is in place, then we should be able to do something like:
eventLoop.spawn(somegenerator()).run()
and have the thread's 'run()' method call
'eventLoop.runUntil(thread.isFinished)'. This would then be a simple
synchronous wrapper around asynchronous operations. With a bit more work,
we could even get it to return any output yielded by somegenerator(). That
is, given:
def somegenerator():
yield "some result"
we should be able to do:
result = eventLoop.spawn(somegenerator()).run()
and 'result' would equal "some result". If there were an error, we would
see the error.
I'm also thinking here that IEventLoop will derive from IScheduler and
ISelector, so that all of the I/O and time-based events will be available
from a single object. It's likely that the actual implementation will
delegate to an IScheduler and an ISelector, though, rather than
re-implementing them itself. In that way, we'll end up with a more
pluggable mechanism, since there are many ways to implement a standalone
ISelector, relying only upon an IScheduler. And the IEventLoop-specific
methods will also rely only upon an IScheduler, at least when not using
Twisted.
If one is using Twisted, however, then all of the interfaces will need to
delegate to a reactor, and we won't be able to support runUntil() without
errors being suppressed. We can handle this, however, by throwing an error
in a Twisted runUntil(), unless you request error suppression. Twisted
can't be used for a nested event loop anyway (because of its singleton
reactor design, and the issues Bob has mentioned for Mac OS X), so there's
no point in trying to get errors to pass out of reactor.run() (which
runUntil will call "under the hood", after setting up a callback on its
exit condition to trigger reactor.stop()).
I think this gets us a design that can co-operate with Twisted, but isn't
hampered by Twisted's limitations when you're not using Twisted. And, we
don't need run()/crash()/stop(). If we really need to exit (e.g. when
forking), we can just raise an error. And it's easy to create an explicit
top-level loop using 'runUntil()', using whatever exit condition(s) you like.
So, I think the end result would look like:
class IEventLoop(IScheduler, ISelector):
def runUntil(eventSource,suppressErrors=False):
"""'tick()' repeatedly until 'eventSource' fires, returning event
If 'suppressErrors' is true, this method will trap and log
all errors without allowing them to reach the caller. Note
that event loop implementations based on Twisted *require*
that 'suppressErrors' be used."""
And later, if we add a thread pool facility, we would probably add
IThreadPool to the superclasses of IEventLoop.
Our UntwistedReactor will migrate to using runUntil() to implement its
run(), using an ISelector to implement its reader/writer support, and
IScheduler.tick() for its iterate() support. And IMainLoop.run() will also
be implemented using IEventLoop.runUntil(). Thus, UntwistedReactor will be
a complete facade, putting a reactor-like face on the peak.events
framework, providing some backward compatibility in the a3 release. But,
if you use Twisted, then IEventLoop and its kin will just be a facade
making Twisted look like the peak.events framework.
I don't expect the need for Twisted compatibility to go away for a long,
long time, though. I don't really want to write GUI event loops and
specialized reactors for a zillion platforms, and Twisted has the market
share and mindshare in that space. And I expect that for a long time,
Twisted will have a significant edge in sheer number of mature protocol
implementations. But, I expect over time the improved ease of coding will
tend to catch us up a bit. It will be particularly helpful to be able to
write protocols that Twisted doesn't have using peak.events, while still
using Twisted ones in the same app.
More information about the PEAK
mailing list