source: orange/Orange/OrangeCanvas/scheme/signalmanager.py @ 11466:8a76128400a7

Revision 11466:8a76128400a7, 15.6 KB checked in by Ales Erjavec <ales.erjavec@…>, 12 months ago (diff)

Fix handling of 'blocking' nodes.

Line 
1"""
2Signal Manager
3==============
4
5A SignalManager instance handles the runtime signal propagation between
6widgets in a scheme.
7
8
9"""
10
11import logging
12import itertools
13
14from collections import namedtuple, defaultdict, deque
15from operator import attrgetter, add
16from functools import partial
17
18
19from PyQt4.QtCore import QObject, QCoreApplication, QEvent
20from PyQt4.QtCore import pyqtSignal as Signal
21
22
23from .scheme import SchemeNode
24
25log = logging.getLogger(__name__)
26
27
28_Signal = namedtuple(
29    "_Signal",
30    ["link",     # link on which the signal is sent
31     "value",    # signal value
32     "id"])      # signal id
33
34
35is_enabled = attrgetter("enabled")
36
37
38class SignalManager(QObject):
39    """
40    Handle all runtime signal propagation for a :clas:`Scheme` instance.
41    The scheme must be passed to the constructor and will become the parent
42    of this object. Furthermore this should happen before any items
43    (nodes, links) are added to the scheme.
44
45    """
46    Running, Stoped, Paused, Error = range(4)
47    """SignalManger state flags."""
48
49    Waiting, Processing = range(2)
50    """SignalManager runtime state flags."""
51
52    stateChanged = Signal(int)
53    """Emitted when the state of the signal manager changes."""
54
55    updatesPending = Signal()
56    """Emitted when signals are added to the queue."""
57
58    processingStarted = Signal([], [SchemeNode])
59    """Emitted right before a `SchemeNode` instance has its inputs
60    updated.
61    """
62
63    processingFinished = Signal([], [SchemeNode])
64    """Emitted right after a `SchemeNode` instance has had its inputs
65    updated.
66    """
67
68    runtimeStateChanged = Signal(int)
69    """Emitted when `SignalManager`'s runtime state changes."""
70
71    def __init__(self, scheme):
72        assert(scheme)
73        QObject.__init__(self, scheme)
74        self._input_queue = []
75
76        # mapping a node to it's current outputs
77        # {node: {channel: {id: signal_value}}}
78        self._node_outputs = {}
79
80        self.__state = SignalManager.Running
81        self.__runtime_state = SignalManager.Waiting
82
83        # A flag indicating if UpdateRequest event should be rescheduled
84        self.__reschedule = False
85
86    def _can_process(self):
87        """
88        Return a bool indicating if the manger can enter the main
89        processing loop.
90
91        """
92        return self.__state not in [SignalManager.Error, SignalManager.Stoped]
93
94    def scheme(self):
95        """
96        Return the parent class:`Scheme` instance.
97        """
98        return self.parent()
99
100    def start(self):
101        """
102        Start the update loop.
103
104        .. note:: The updates will not happen until the control reaches
105                  the Qt event loop.
106
107        """
108        if self.__state != SignalManager.Running:
109            self.__state = SignalManager.Running
110            self.stateChanged.emit(SignalManager.Running)
111            self._update()
112
113    def stop(self):
114        """
115        Stop the update loop.
116
117        .. note:: If the `SignalManager` is currently in `process_queues` it
118                  will still update all current pending signals, but will not
119                  re-enter until `start()` is called again
120
121        """
122        if self.__state != SignalManager.Stoped:
123            self.__state = SignalManager.Stoped
124            self.stateChanged.emit(SignalManager.Stoped)
125
126    def pause(self):
127        """
128        Pause the updates.
129
130        """
131        if self.__state != SignalManager.Paused:
132            self.__state = SignalManager.Paused
133            self.stateChanged.emit(SignalManager.Paused)
134
135    def resume(self):
136        if self.__state == SignalManager.Paused:
137            self.__state = SignalManager.Running
138            self.stateChanged.emit(self.__state)
139            self._update()
140
141    def step(self):
142        if self.__state == SignalManager.Paused:
143            self.process_queued(1)
144
145    def state(self):
146        """
147        Return the current state.
148        """
149        return self.__state
150
151    def _set_runtime_state(self, state):
152        """
153        Set the runtime state.
154
155        Should only be called by `SignalManager` implementations.
156
157        """
158        if self.__runtime_state != state:
159            self.__runtime_state = state
160            self.runtimeStateChanged.emit(self.__runtime_state)
161
162    def runtime_state(self):
163        """
164        Return the runtime state. This can be `SignalManager.Waiting`
165        or `SignalManager.Processing`.
166
167        """
168        return self.__runtime_state
169
170    def on_node_removed(self, node):
171        # remove all pending input signals for node so we don't get stale
172        # references in process_node
173        # NOTE: This does not remove output signals this node. In particular
174        # the final 'None' values might be left on the queue.
175        log.info("Node %r removed. Removing pending signals.",
176                 node.title)
177        self.remove_pending_signals(node)
178
179        del self._node_outputs[node]
180
181    def on_node_added(self, node):
182        self._node_outputs[node] = defaultdict(dict)
183
184    def link_added(self, link):
185        # push all current source values to the sink
186        if link.enabled:
187            log.info("Link added (%s). Scheduling signal data update.", link)
188            self._schedule(self.signals_on_link(link))
189            self._update()
190
191        link.enabled_changed.connect(self.link_enabled_changed)
192
193    def link_removed(self, link):
194        # purge all values in sink's queue
195        log.info("Link removed (%s). Scheduling signal data purge.", link)
196        self.purge_link(link)
197
198    def link_enabled_changed(self, enabled):
199        if enabled:
200            link = self.sender()
201            log.info("Link %s enabled. Scheduling signal data update.", link)
202            self._schedule(self.signals_on_link(link))
203
204    def signals_on_link(self, link):
205        """
206        Return _Signal instances representing the current values
207        present on the link.
208
209        """
210        items = self.link_contents(link)
211        signals = []
212
213        for key, value in items.items():
214            signals.append(_Signal(link, value, key))
215
216        return signals
217
218    def link_contents(self, link):
219        """
220        Return the contents on link.
221        """
222        node, channel = link.source_node, link.source_channel
223
224        return self._node_outputs[node][channel]
225
226    def send(self, node, channel, value, id):
227        """
228        """
229        log.debug("%r sending %r (id: %r) on channel %r",
230                  node.title, type(value), id, channel.name)
231
232        scheme = self.scheme()
233
234        self._node_outputs[node][channel][id] = value
235
236        links = scheme.find_links(source_node=node, source_channel=channel)
237        links = filter(is_enabled, links)
238
239        signals = []
240        for link in links:
241            signals.append(_Signal(link, value, id))
242
243        self._schedule(signals)
244
245    def purge_link(self, link):
246        """
247        Purge the link (send None for all ids currently present)
248        """
249        contents = self.link_contents(link)
250        ids = contents.keys()
251        signals = [_Signal(link, None, id) for id in ids]
252
253        self._schedule(signals)
254
255    def _schedule(self, signals):
256        """
257        Schedule a list of :class:`_Signal` for delivery.
258        """
259        self._input_queue.extend(signals)
260
261        if signals:
262            self.updatesPending.emit()
263
264        self._update()
265
266    def _update_links(self, source_node=None, source_channel=None,
267                      sink_node=None, sink_channel=None):
268        """
269        Schedule update of all enabled links matching the query.
270
271        See :ref:`Scheme.find_links` for description of parameters.
272
273        """
274        links = self.scheme().find_links(source_node=source_node,
275                                         source_channel=source_channel,
276                                         sink_node=sink_node,
277                                         sink_channel=sink_channel)
278        links = filter(is_enabled, links)
279
280        signals = reduce(add, self.signals_on_link, [])
281        self._schedule(signals)
282
283    def _update_link(self, link):
284        """
285        Schedule update of a single link.
286        """
287        signals = self.signals_on_link(link)
288        self._schedule(signals)
289
290    def process_queued(self, max_nodes=None):
291        """
292        Process queued signals.
293        """
294        if self.__runtime_state == SignalManager.Processing:
295            raise RuntimeError("Cannot re-enter 'process_queued'")
296
297        if not self._can_process():
298            raise RuntimeError("Can't process in state %i" % self.__state)
299
300        log.info("Processing queued signals")
301
302        node_update_front = self.node_update_front()
303
304        if max_nodes is not None:
305            node_update_front = node_update_front[:max_nodes]
306
307        log.debug("Nodes for update %s",
308                  [node.title for node in node_update_front])
309
310        self._set_runtime_state(SignalManager.Processing)
311        try:
312            # TODO: What if the update front changes in the loop?
313            for node in node_update_front:
314                self.process_node(node)
315        finally:
316            self._set_runtime_state(SignalManager.Waiting)
317
318    def process_node(self, node):
319        """
320        Process pending input signals for `node`.
321        """
322        signals_in = self.pending_input_signals(node)
323        self.remove_pending_signals(node)
324
325        signals_in = self.compress_signals(signals_in)
326
327        log.debug("Processing %r, sending %i signals.",
328                  node.title, len(signals_in))
329
330        self.processingStarted.emit()
331        self.processingStarted[SchemeNode].emit(node)
332        try:
333            self.send_to_node(node, signals_in)
334        finally:
335            self.processingFinished.emit()
336            self.processingFinished[SchemeNode].emit(node)
337
338    def compress_signals(self, signals):
339        """
340        Compress a list of :class:`_Signal` instances to be delivered.
341
342        The base implementation returns the list unmodified.
343
344        """
345        return signals
346
347    def send_to_node(self, node, signals):
348        """
349        Abstract. Reimplement in subclass.
350
351        Send/notify the :class:`SchemeNode` instance (or whatever
352        object/instance it is a representation of) that it has new inputs
353        as represented by the signals list (list of :class:`_Signal`).
354
355        """
356        raise NotImplementedError
357
358    def is_pending(self, node):
359        """
360        Is `node` (class:`SchemeNode`) scheduled for processing (i.e.
361        it has incoming pending signals).
362
363        """
364        return node in [signal.link.sink_node for signal in self._input_queue]
365
366    def pending_nodes(self):
367        """
368        Return a list of pending nodes (in no particular order).
369        """
370        return list(set(sig.link.sink_node for sig in self._input_queue))
371
372    def pending_input_signals(self, node):
373        """
374        Return a list of pending input signals for node.
375        """
376        return [signal for signal in self._input_queue
377                if node is signal.link.sink_node]
378
379    def remove_pending_signals(self, node):
380        """
381        Remove pending signals for `node`.
382        """
383        for signal in self.pending_input_signals(node):
384            try:
385                self._input_queue.remove(signal)
386            except ValueError:
387                pass
388
389    def blocking_nodes(self):
390        """
391        Return a list of nodes in a blocking state.
392        """
393        scheme = self.scheme()
394        return [node for node in scheme.nodes if self.is_blocking(node)]
395
396    def is_blocking(self, node):
397        return False
398
399    def node_update_front(self):
400        """
401        Return a list of nodes on the update front, i.e. nodes scheduled for
402        an update that have no ancestor which is either itself scheduled
403        for update or is in a blocking state)
404
405        .. note::
406            The node's ancestors are only computed over enabled links.
407
408        """
409        scheme = self.scheme()
410
411        blocking_nodes = set(self.blocking_nodes())
412
413        dependents = partial(dependent_nodes, scheme)
414
415        blocked_nodes = reduce(set.union,
416                               map(dependents, blocking_nodes),
417                               set(blocking_nodes))
418
419        pending = set(self.pending_nodes())
420
421        pending_downstream = reduce(set.union,
422                                    map(dependents, pending),
423                                    set())
424
425        log.debug("Pending nodes: %s", pending)
426        log.debug("Blocking nodes: %s", blocking_nodes)
427
428        return list(pending - pending_downstream - blocked_nodes)
429
430    def event(self, event):
431        if event.type() == QEvent.UpdateRequest:
432            if not self.__state == SignalManager.Running:
433                log.debug("Received UpdateRequest event while not "
434                          "in 'Running' state")
435                event.setAccepted(False)
436                return False
437
438            if self.__runtime_state == SignalManager.Processing:
439                log.debug("received UpdateRequest event while in "
440                          "'process_queued'")
441                # This happens if someone calls QCoreApplication.processEvents
442                # from the signal handlers.
443                self.__reschedule = True
444                event.accept()
445                return True
446
447            log.debug("UpdateRequest event, queued signals: %i",
448                      len(self._input_queue))
449            if self._input_queue:
450                self.process_queued()
451            event.accept()
452
453            if self.__reschedule:
454                log.debug("Rescheduling UpdateRequest event")
455                self._update()
456                self.__reschedule = False
457
458            return True
459
460        return QObject.event(self, event)
461
462    def _update(self):
463        """
464        Schedule processing at a later time.
465        """
466        QCoreApplication.postEvent(self, QEvent(QEvent.UpdateRequest))
467
468
469def can_enable_dynamic(link, value):
470    """
471    Can the a dynamic `link` (:class:`SchemeLink`) be enabled for`value`.
472    """
473    return isinstance(value, link.sink_type())
474
475
476def compress_signals(signals):
477    """
478    Compress a list of signals.
479    """
480    groups = group_by_all(reversed(signals),
481                          key=lambda sig: (sig.link, sig.id))
482    signals = []
483
484    def has_none(signals):
485        return any(sig.value is None for sig in signals)
486
487    for (link, id), signals_grouped in groups:
488        if len(signals_grouped) > 1 and has_none(signals_grouped[1:]):
489            signals.append(signals_grouped[0])
490            signals.append(_Signal(link, None, id))
491        else:
492            signals.append(signals_grouped[0])
493
494    return list(reversed(signals))
495
496
497def dependent_nodes(scheme, node):
498    """
499    Return a list of all nodes (in breadth first order) in `scheme` that
500    are dependent on `node`,
501
502    .. note::
503        This does not include nodes only reachable by disables links.
504
505    """
506    def expand(node):
507        return [link.sink_node
508                for link in scheme.find_links(source_node=node)
509                if link.enabled]
510
511    nodes = list(traverse_bf(node, expand))
512    assert nodes[0] is node
513    # Remove the first item (`node`).
514    return nodes[1:]
515
516
517def traverse_bf(start, expand):
518    queue = deque([start])
519    visited = set()
520    while queue:
521        item = queue.popleft()
522        if item not in visited:
523            yield item
524            visited.add(item)
525            queue.extend(expand(item))
526
527
528def group_by_all(sequence, key=None):
529    order_seen = []
530    groups = {}
531    for item in sequence:
532        if key is not None:
533            item_key = key(item)
534        else:
535            item_key = item
536        if item_key in groups:
537            groups[item_key].append(item)
538        else:
539            groups[item_key] = [item]
540            order_seen.append(item_key)
541
542    return [(key, groups[key]) for key in order_seen]
Note: See TracBrowser for help on using the repository browser.