Ignore changes in the amount of whitespace
Differences between version dated 2008-01-07 15:53:54 and 2008-05-23 12:49:13
(spanning 3 versions)
Deletions are marked like this.
Additions are marked like this.
And of course, you can use this boolean test in a rule, to trigger some action::
>>> class AlarmClock(trellis.Component):
... trellis.values(timeout = None)
... timeout = trellis.attr(None)
... trellis.maintain()
... def alert(self):
... if self.timeout:
... print "timed out!"
... alert = trellis.rule(alert)
>>> clock = AlarmClock(timeout=Time[20])
>>> Time.advance(15)
passed::
>>> class Elapsed(trellis.Component):
... trellis.values(duration = 20)
... trellis.rules(has_run_for = lambda self: Time[0])
...
... duration = trellis.attr(20)
... has_run_for = trellis.maintain(lambda self: Time[0])
... trellis.maintain()
... def alarm(self):
... if self.has_run_for[self.duration]:
... print "timed out!"
... alarm = trellis.rule(alarm)
>>> t = Elapsed() # Capture a start time
>>> Time.advance(15) # duration is 20, so no alarm yet
long a process has been idle (or busy)::
>>> class IdleTimer(trellis.Component):
... trellis.values(
... idle_for = activity.NOT_YET,
... trellis.attrs(
... idle_timeout = 20,
... busy = False,
... )
... trellis.rules(
... idle_for = lambda self:
... self.idle_for.begins_with(not self.busy)
... idle_for = trellis.maintain(
... lambda self: self.idle_for.begins_with(not self.busy),
... initially = activity.NOT_YET
... )
... trellis.maintain()
... def alarm(self):
... if self.idle_for[self.idle_timeout]:
... print "timed out!"
... alarm = trellis.rule(alarm)
The way this code works, is that initially the ``idle_for`` timer is equal to
the special ``NOT_YET`` value, representing a moment that will never be
flush(`count=None`)
Run up to `count` outstanding ``call()`` callbacks, if any are pending. If
`count` is ``Non``, run all pending callbacks.
`count` is ``None``, run all pending callbacks.
The ``poll()`` and ``flush()`` are mainly intended for your convenience when
writing tests of code that would ordinarily be run inside an event loop. In
You can check an event loop's status using its ``running`` and
``stop_requested`` attributes, which are both normally false::
>>> @trellis.ObserverCell
>>> @trellis.Performer
... def LoopWatch():
... print "Running =", EventLoop.running
... print "Stopping =", EventLoop.stop_requested
As you can see, the ``running`` attribute turns true once the event loop starts
running. When ``stop()`` is called, the ``stop_requested`` attribute becomes
true, and then both ``running``and ``stop_requested`` return to their normal
true, and then both ``running`` and ``stop_requested`` return to their normal
values.
state along the way. For example::
>>> class TaskExample(trellis.Component):
... trellis.values(
... trellis.attrs(
... start = False,
... stop = False
... )
can rewrite our example like this, for more modularity::
>>> class TaskExample(trellis.Component):
... trellis.values(
... trellis.attrs(
... start = False,
... stop = False
... )
the attributes and methods you'll need to implement:
_ticker
This should be a ``@trellis.observer`` that only executes when the
This should be a ``@trellis.perform`` rule that only executes when the
event loop is running, and handles scheduling for ``Time.tick()`` calls.
When ``self._next_time`` is not ``None``, it is a value indicating the
number of seconds until the next scheduled event, and you should arrange
>>> class MyEventLoop(activity.EventLoop):
... context.replaces(activity.EventLoop) # <-- must have this line!
...
... @trellis.observer
... @trellis.perform
... def _ticker(self):
... if self.running:
... if Time.auto_update:
For more examples, check out the source code of the ``TwistedEventLoop`` and
``WXEventLoop`` classes in ``peak.events.activity``: they're both quite short.