[PEAK] Anybody using reactors?
Phillip J. Eby
pje at telecommunity.com
Wed Jan 14 15:47:55 EST 2004
At 10:13 PM 1/14/04 +0200, alexander smishlajev wrote:
>Phillip J. Eby wrote, at 14.01.2004 20:42:
>
>>>>would work, then?
>>>
>>>it seems so, at least at the first glance.
>>Good to know; I won't worry about such cancellations in peak.events,
>>then, unless somebody else presents a different use case for
>>cancelling/rescheduling.
>
>i found one slightly different case, although it is implemented without
>removeLater (we didn't have such beast at that time). when a running task
>is suspended by another task, it shedules an operator warning to be iisued
>after some time; when the task is resumed, the warning is disabled
>(warning procedure is still run, but does not produce the message).
>
>it seems to me that this case also may be handled with
>
> yeld events.AnyOf(appEvents.resume, scheduler.timeout())
>
>>So far, it seems to me the strength of peak.events is in managing events
>>that happen in sequence, which is *very* hard to do clearly in a
>>reactor-driven style.
>
>i wouldn't say it was that hard. here's reactor callback of one of the
>most complicated components:
I said it was hard to do *clearly*, and I've also found it hard to do
correctly as well. For example, peak.tools.supervisor has many state
management bits that are not at all clear, and had many non-obvious bugs in
the first versions. By contrast, between yesterday and today I did two
different versions of an "all processes are busy" reporting thread that I
expect would've taken all week to get correct in the reactor-driven style.
>(you may wonder why it is done by getWork/doWork, and not by doRead.
>that's because pipes between application components are not select()able
>on windows platform.)
If they are pipes used solely *within* the process, you may find it more
useful to use an events.Distributor and call 'send()' on it, like this:
def producer(scheduler,pipe):
i = 0
while True:
pipe.send(i)
i+=1
yield scheduler.sleep(); events.resume()
def consumer(pipe):
while True:
yield pipe; data = events.resume()
print data
pipe = events.Distributor()
c = events.Thread(consumer(pipe))
p = events.Thread(producer(scheduler,pipe))
Notice that we start the consumer before the producer, so that no data will
be lost. The only downside to this mechanism is that there is no buffering
and if nobody is listening when a send() occurs, the data is dropped on the
floor. Of course, one could always implement those features atop the
current microkernel, I just backed off from putting a Queue type in the
microkernel itself. It raised too many policy questions about what it was
I really wanted in a Queue, garbage collection issues, etc., and was
distracting from getting the more fundamental features in place.
More information about the PEAK
mailing list