[PEAK] subscribable/conditional lists
Phillip J. Eby
pje at telecommunity.com
Tue Apr 20 19:49:36 EDT 2004
At 04:22 PM 4/20/04 -0700, John Landahl wrote:
>peak.events could use support for subscribable/conditional lists, which
>could be used as an input queue for a Task-based worker loop. Something
>like the following:
>
>class Work(binding.Component):
> queue = binding.Make(ConditionalList)
>
> def worker(self):
> while True:
> yield self.queue; queue = events.resume()
> while queue:
> doWork(queue.pop(0))
> worker = binding.Make(events.taskFactory(worker), uponAssembly=True)
>
> def addWork(self, data):
> self.queue.append(data)
>
>The worker should suspend until there's work for it in the queue, at
>which point it consumes the data in the queue and waits for more work.
>
>I've come up with a basic implementation (attached); does seem like a
>decent approach, or is there a better way to do this?
Dunno. This is what semaphores are ordinarily used for, though. That is,
you bump the semaphore up when there's an item in the queue, and down to
remove an item. The advantage of this approach is that it's safe for there
to be multiple tasks pulling items from the queue.
Of course, to really do queueing safely, you need to ensure that the queue
can't grow unboundedly, so you actually need to also have a condition that
indicates when the queue isn't full. That way, a task writing to the queue
can block until there's room to add something.
Anyway, the reason I said "dunno" is because I don't know what your
requirements are. I would guess, however, that all the extra list stuff
you implemented probably isn't very useful for most applications, and from
what I can tell, some of its implementation is broken, too. I'd probably
just redo your example as something like:
class Work(binding.Component):
queue = binding.Make(list)
itemsQueued = binding.Make(events.Semaphore)
def worker(self):
while True:
yield self.itemsQueued; events.resume()
item = self.queue.pop(0)
self.itemsQueued.take() # mine! nobody else can have it!
doWork(item)
worker = binding.Make(events.taskFactory(worker),
uponAssembly=True)
def addWork(self,data):
self.queue.append(data) # put it in the queue, then
self.itemsQueued.put() # let waiting tasks know it's there
Which is just as simple as your version, but more explicit about where task
switching can or can't take place.
More information about the PEAK
mailing list