source: orange/Orange/OrangeCanvas/scheme/signalmanager.py @ 11467:6b291596554f

Revision 11467:6b291596554f, 15.8 KB checked in by Ales Erjavec <ales.erjavec@…>, 12 months ago (diff)

Update only a single pending node in the 'UpdateRequest' event.

Schedule annother update if there are more pending nodes on the queue.

Line 
1"""
2Signal Manager
3==============
4
5A SignalManager instance handles the runtime signal propagation between
6widgets in a scheme.
7
8
9"""
10
11import logging
12import itertools
13
14from collections import namedtuple, defaultdict, deque
15from operator import attrgetter, add
16from functools import partial
17
18
19from PyQt4.QtCore import QObject, QCoreApplication, QEvent
20from PyQt4.QtCore import pyqtSignal as Signal
21
22
23from .scheme import SchemeNode
24
25log = logging.getLogger(__name__)
26
27
28_Signal = namedtuple(
29    "_Signal",
30    ["link",     # link on which the signal is sent
31     "value",    # signal value
32     "id"])      # signal id
33
34
35is_enabled = attrgetter("enabled")
36
37
38class SignalManager(QObject):
39    """
40    Handle all runtime signal propagation for a :clas:`Scheme` instance.
41    The scheme must be passed to the constructor and will become the parent
42    of this object. Furthermore this should happen before any items
43    (nodes, links) are added to the scheme.
44
45    """
46    Running, Stoped, Paused, Error = range(4)
47    """SignalManger state flags."""
48
49    Waiting, Processing = range(2)
50    """SignalManager runtime state flags."""
51
52    stateChanged = Signal(int)
53    """Emitted when the state of the signal manager changes."""
54
55    updatesPending = Signal()
56    """Emitted when signals are added to the queue."""
57
58    processingStarted = Signal([], [SchemeNode])
59    """Emitted right before a `SchemeNode` instance has its inputs
60    updated.
61    """
62
63    processingFinished = Signal([], [SchemeNode])
64    """Emitted right after a `SchemeNode` instance has had its inputs
65    updated.
66    """
67
68    runtimeStateChanged = Signal(int)
69    """Emitted when `SignalManager`'s runtime state changes."""
70
71    def __init__(self, scheme):
72        assert(scheme)
73        QObject.__init__(self, scheme)
74        self._input_queue = []
75
76        # mapping a node to it's current outputs
77        # {node: {channel: {id: signal_value}}}
78        self._node_outputs = {}
79
80        self.__state = SignalManager.Running
81        self.__runtime_state = SignalManager.Waiting
82
83        # A flag indicating if UpdateRequest event should be rescheduled
84        self.__reschedule = False
85
86    def _can_process(self):
87        """
88        Return a bool indicating if the manger can enter the main
89        processing loop.
90
91        """
92        return self.__state not in [SignalManager.Error, SignalManager.Stoped]
93
94    def scheme(self):
95        """
96        Return the parent class:`Scheme` instance.
97        """
98        return self.parent()
99
100    def start(self):
101        """
102        Start the update loop.
103
104        .. note:: The updates will not happen until the control reaches
105                  the Qt event loop.
106
107        """
108        if self.__state != SignalManager.Running:
109            self.__state = SignalManager.Running
110            self.stateChanged.emit(SignalManager.Running)
111            self._update()
112
113    def stop(self):
114        """
115        Stop the update loop.
116
117        .. note:: If the `SignalManager` is currently in `process_queues` it
118                  will still update all current pending signals, but will not
119                  re-enter until `start()` is called again
120
121        """
122        if self.__state != SignalManager.Stoped:
123            self.__state = SignalManager.Stoped
124            self.stateChanged.emit(SignalManager.Stoped)
125
126    def pause(self):
127        """
128        Pause the updates.
129
130        """
131        if self.__state != SignalManager.Paused:
132            self.__state = SignalManager.Paused
133            self.stateChanged.emit(SignalManager.Paused)
134
135    def resume(self):
136        if self.__state == SignalManager.Paused:
137            self.__state = SignalManager.Running
138            self.stateChanged.emit(self.__state)
139            self._update()
140
141    def step(self):
142        if self.__state == SignalManager.Paused:
143            self.process_queued(1)
144
145    def state(self):
146        """
147        Return the current state.
148        """
149        return self.__state
150
151    def _set_runtime_state(self, state):
152        """
153        Set the runtime state.
154
155        Should only be called by `SignalManager` implementations.
156
157        """
158        if self.__runtime_state != state:
159            self.__runtime_state = state
160            self.runtimeStateChanged.emit(self.__runtime_state)
161
162    def runtime_state(self):
163        """
164        Return the runtime state. This can be `SignalManager.Waiting`
165        or `SignalManager.Processing`.
166
167        """
168        return self.__runtime_state
169
170    def on_node_removed(self, node):
171        # remove all pending input signals for node so we don't get stale
172        # references in process_node
173        # NOTE: This does not remove output signals this node. In particular
174        # the final 'None' values might be left on the queue.
175        log.info("Node %r removed. Removing pending signals.",
176                 node.title)
177        self.remove_pending_signals(node)
178
179        del self._node_outputs[node]
180
181    def on_node_added(self, node):
182        self._node_outputs[node] = defaultdict(dict)
183
184    def link_added(self, link):
185        # push all current source values to the sink
186        if link.enabled:
187            log.info("Link added (%s). Scheduling signal data update.", link)
188            self._schedule(self.signals_on_link(link))
189            self._update()
190
191        link.enabled_changed.connect(self.link_enabled_changed)
192
193    def link_removed(self, link):
194        # purge all values in sink's queue
195        log.info("Link removed (%s). Scheduling signal data purge.", link)
196        self.purge_link(link)
197
198    def link_enabled_changed(self, enabled):
199        if enabled:
200            link = self.sender()
201            log.info("Link %s enabled. Scheduling signal data update.", link)
202            self._schedule(self.signals_on_link(link))
203
204    def signals_on_link(self, link):
205        """
206        Return _Signal instances representing the current values
207        present on the link.
208
209        """
210        items = self.link_contents(link)
211        signals = []
212
213        for key, value in items.items():
214            signals.append(_Signal(link, value, key))
215
216        return signals
217
218    def link_contents(self, link):
219        """
220        Return the contents on link.
221        """
222        node, channel = link.source_node, link.source_channel
223
224        return self._node_outputs[node][channel]
225
226    def send(self, node, channel, value, id):
227        """
228        """
229        log.debug("%r sending %r (id: %r) on channel %r",
230                  node.title, type(value), id, channel.name)
231
232        scheme = self.scheme()
233
234        self._node_outputs[node][channel][id] = value
235
236        links = scheme.find_links(source_node=node, source_channel=channel)
237        links = filter(is_enabled, links)
238
239        signals = []
240        for link in links:
241            signals.append(_Signal(link, value, id))
242
243        self._schedule(signals)
244
245    def purge_link(self, link):
246        """
247        Purge the link (send None for all ids currently present)
248        """
249        contents = self.link_contents(link)
250        ids = contents.keys()
251        signals = [_Signal(link, None, id) for id in ids]
252
253        self._schedule(signals)
254
255    def _schedule(self, signals):
256        """
257        Schedule a list of :class:`_Signal` for delivery.
258        """
259        self._input_queue.extend(signals)
260
261        if signals:
262            self.updatesPending.emit()
263
264        self._update()
265
266    def _update_links(self, source_node=None, source_channel=None,
267                      sink_node=None, sink_channel=None):
268        """
269        Schedule update of all enabled links matching the query.
270
271        See :ref:`Scheme.find_links` for description of parameters.
272
273        """
274        links = self.scheme().find_links(source_node=source_node,
275                                         source_channel=source_channel,
276                                         sink_node=sink_node,
277                                         sink_channel=sink_channel)
278        links = filter(is_enabled, links)
279
280        signals = reduce(add, self.signals_on_link, [])
281        self._schedule(signals)
282
283    def _update_link(self, link):
284        """
285        Schedule update of a single link.
286        """
287        signals = self.signals_on_link(link)
288        self._schedule(signals)
289
290    def process_queued(self, max_nodes=None):
291        """
292        Process queued signals.
293        """
294        if self.__runtime_state == SignalManager.Processing:
295            raise RuntimeError("Cannot re-enter 'process_queued'")
296
297        if not self._can_process():
298            raise RuntimeError("Can't process in state %i" % self.__state)
299
300        log.info("Processing queued signals")
301
302        node_update_front = self.node_update_front()
303
304        if max_nodes is not None:
305            node_update_front = node_update_front[:max_nodes]
306
307        log.debug("Nodes for update %s",
308                  [node.title for node in node_update_front])
309
310        self._set_runtime_state(SignalManager.Processing)
311        try:
312            # TODO: What if the update front changes in the loop?
313            for node in node_update_front:
314                self.process_node(node)
315        finally:
316            self._set_runtime_state(SignalManager.Waiting)
317
318    def process_node(self, node):
319        """
320        Process pending input signals for `node`.
321        """
322        signals_in = self.pending_input_signals(node)
323        self.remove_pending_signals(node)
324
325        signals_in = self.compress_signals(signals_in)
326
327        log.debug("Processing %r, sending %i signals.",
328                  node.title, len(signals_in))
329
330        self.processingStarted.emit()
331        self.processingStarted[SchemeNode].emit(node)
332        try:
333            self.send_to_node(node, signals_in)
334        finally:
335            self.processingFinished.emit()
336            self.processingFinished[SchemeNode].emit(node)
337
338    def compress_signals(self, signals):
339        """
340        Compress a list of :class:`_Signal` instances to be delivered.
341
342        The base implementation returns the list unmodified.
343
344        """
345        return signals
346
347    def send_to_node(self, node, signals):
348        """
349        Abstract. Reimplement in subclass.
350
351        Send/notify the :class:`SchemeNode` instance (or whatever
352        object/instance it is a representation of) that it has new inputs
353        as represented by the signals list (list of :class:`_Signal`).
354
355        """
356        raise NotImplementedError
357
358    def is_pending(self, node):
359        """
360        Is `node` (class:`SchemeNode`) scheduled for processing (i.e.
361        it has incoming pending signals).
362
363        """
364        return node in [signal.link.sink_node for signal in self._input_queue]
365
366    def pending_nodes(self):
367        """
368        Return a list of pending nodes (in no particular order).
369        """
370        return list(set(sig.link.sink_node for sig in self._input_queue))
371
372    def pending_input_signals(self, node):
373        """
374        Return a list of pending input signals for node.
375        """
376        return [signal for signal in self._input_queue
377                if node is signal.link.sink_node]
378
379    def remove_pending_signals(self, node):
380        """
381        Remove pending signals for `node`.
382        """
383        for signal in self.pending_input_signals(node):
384            try:
385                self._input_queue.remove(signal)
386            except ValueError:
387                pass
388
389    def blocking_nodes(self):
390        """
391        Return a list of nodes in a blocking state.
392        """
393        scheme = self.scheme()
394        return [node for node in scheme.nodes if self.is_blocking(node)]
395
396    def is_blocking(self, node):
397        return False
398
399    def node_update_front(self):
400        """
401        Return a list of nodes on the update front, i.e. nodes scheduled for
402        an update that have no ancestor which is either itself scheduled
403        for update or is in a blocking state)
404
405        .. note::
406            The node's ancestors are only computed over enabled links.
407
408        """
409        scheme = self.scheme()
410
411        blocking_nodes = set(self.blocking_nodes())
412
413        dependents = partial(dependent_nodes, scheme)
414
415        blocked_nodes = reduce(set.union,
416                               map(dependents, blocking_nodes),
417                               set(blocking_nodes))
418
419        pending = set(self.pending_nodes())
420
421        pending_downstream = reduce(set.union,
422                                    map(dependents, pending),
423                                    set())
424
425        log.debug("Pending nodes: %s", pending)
426        log.debug("Blocking nodes: %s", blocking_nodes)
427
428        return list(pending - pending_downstream - blocked_nodes)
429
430    def event(self, event):
431        if event.type() == QEvent.UpdateRequest:
432            if not self.__state == SignalManager.Running:
433                log.debug("Received 'UpdateRequest' event while not "
434                          "in 'Running' state")
435                event.setAccepted(False)
436                return False
437
438            if self.__runtime_state == SignalManager.Processing:
439                log.debug("Received 'UpdateRequest' event while in "
440                          "'process_queued'")
441                # This happens if someone calls QCoreApplication.processEvents
442                # from the signal handlers.
443                self.__reschedule = True
444                event.accept()
445                return True
446
447            log.info("'UpdateRequest' event, queued signals: %i",
448                      len(self._input_queue))
449            if self._input_queue:
450                self.process_queued(max_nodes=1)
451            event.accept()
452
453            if self.__reschedule:
454                log.debug("Rescheduling 'UpdateRequest' event")
455                self._update()
456                self.__reschedule = False
457            elif self.node_update_front():
458                log.debug("More nodes are eligible for an update. "
459                          "Scheduling another update.")
460                self._update()
461
462            return True
463
464        return QObject.event(self, event)
465
466    def _update(self):
467        """
468        Schedule processing at a later time.
469        """
470        QCoreApplication.postEvent(self, QEvent(QEvent.UpdateRequest))
471
472
473def can_enable_dynamic(link, value):
474    """
475    Can the a dynamic `link` (:class:`SchemeLink`) be enabled for`value`.
476    """
477    return isinstance(value, link.sink_type())
478
479
480def compress_signals(signals):
481    """
482    Compress a list of signals.
483    """
484    groups = group_by_all(reversed(signals),
485                          key=lambda sig: (sig.link, sig.id))
486    signals = []
487
488    def has_none(signals):
489        return any(sig.value is None for sig in signals)
490
491    for (link, id), signals_grouped in groups:
492        if len(signals_grouped) > 1 and has_none(signals_grouped[1:]):
493            signals.append(signals_grouped[0])
494            signals.append(_Signal(link, None, id))
495        else:
496            signals.append(signals_grouped[0])
497
498    return list(reversed(signals))
499
500
501def dependent_nodes(scheme, node):
502    """
503    Return a list of all nodes (in breadth first order) in `scheme` that
504    are dependent on `node`,
505
506    .. note::
507        This does not include nodes only reachable by disables links.
508
509    """
510    def expand(node):
511        return [link.sink_node
512                for link in scheme.find_links(source_node=node)
513                if link.enabled]
514
515    nodes = list(traverse_bf(node, expand))
516    assert nodes[0] is node
517    # Remove the first item (`node`).
518    return nodes[1:]
519
520
521def traverse_bf(start, expand):
522    queue = deque([start])
523    visited = set()
524    while queue:
525        item = queue.popleft()
526        if item not in visited:
527            yield item
528            visited.add(item)
529            queue.extend(expand(item))
530
531
532def group_by_all(sequence, key=None):
533    order_seen = []
534    groups = {}
535    for item in sequence:
536        if key is not None:
537            item_key = key(item)
538        else:
539            item_key = item
540        if item_key in groups:
541            groups[item_key].append(item)
542        else:
543            groups[item_key] = [item]
544            order_seen.append(item_key)
545
546    return [(key, groups[key]) for key in order_seen]
Note: See TracBrowser for help on using the repository browser.