source: orange/Orange/OrangeCanvas/scheme/signalmanager.py @ 11639:ad4d94a25e19

Revision 11639:ad4d94a25e19, 15.9 KB checked in by Ales Erjavec <ales.erjavec@…>, 9 months ago (diff)

Refactored the OWWidget management out of WidgetsScheme and SignalManager.

Line 
1"""
2Signal Manager
3==============
4
5A SignalManager instance handles the runtime signal propagation between
6widgets in a scheme.
7
8
9"""
10
11import logging
12import itertools
13
14from collections import namedtuple, defaultdict, deque
15from operator import attrgetter, add
16from functools import partial
17
18
19from PyQt4.QtCore import QObject, QCoreApplication, QEvent
20from PyQt4.QtCore import pyqtSignal as Signal
21
22
23from .scheme import SchemeNode
24
25log = logging.getLogger(__name__)
26
27
28_Signal = namedtuple(
29    "_Signal",
30    ["link",     # link on which the signal is sent
31     "value",    # signal value
32     "id"])      # signal id
33
34
35is_enabled = attrgetter("enabled")
36
37
38class SignalManager(QObject):
39    """
40    Handle all runtime signal propagation for a :clas:`Scheme` instance.
41    The scheme must be passed to the constructor and will become the parent
42    of this object. Furthermore this should happen before any items
43    (nodes, links) are added to the scheme.
44
45    """
46    Running, Stoped, Paused, Error = range(4)
47    """SignalManger state flags."""
48
49    Waiting, Processing = range(2)
50    """SignalManager runtime state flags."""
51
52    stateChanged = Signal(int)
53    """Emitted when the state of the signal manager changes."""
54
55    updatesPending = Signal()
56    """Emitted when signals are added to the queue."""
57
58    processingStarted = Signal([], [SchemeNode])
59    """Emitted right before a `SchemeNode` instance has its inputs
60    updated.
61    """
62
63    processingFinished = Signal([], [SchemeNode])
64    """Emitted right after a `SchemeNode` instance has had its inputs
65    updated.
66    """
67
68    runtimeStateChanged = Signal(int)
69    """Emitted when `SignalManager`'s runtime state changes."""
70
71    def __init__(self, scheme):
72        assert(scheme)
73        QObject.__init__(self, scheme)
74        self._input_queue = []
75
76        # mapping a node to it's current outputs
77        # {node: {channel: {id: signal_value}}}
78        self._node_outputs = {}
79
80        self.__state = SignalManager.Running
81        self.__runtime_state = SignalManager.Waiting
82
83        # A flag indicating if UpdateRequest event should be rescheduled
84        self.__reschedule = False
85
86    def _can_process(self):
87        """
88        Return a bool indicating if the manger can enter the main
89        processing loop.
90
91        """
92        return self.__state not in [SignalManager.Error, SignalManager.Stoped]
93
94    def scheme(self):
95        """
96        Return the parent class:`Scheme` instance.
97        """
98        return self.parent()
99
100    def start(self):
101        """
102        Start the update loop.
103
104        .. note:: The updates will not happen until the control reaches
105                  the Qt event loop.
106
107        """
108        if self.__state != SignalManager.Running:
109            self.__state = SignalManager.Running
110            self.stateChanged.emit(SignalManager.Running)
111            self._update()
112
113    def stop(self):
114        """
115        Stop the update loop.
116
117        .. note:: If the `SignalManager` is currently in `process_queues` it
118                  will still update all current pending signals, but will not
119                  re-enter until `start()` is called again
120
121        """
122        if self.__state != SignalManager.Stoped:
123            self.__state = SignalManager.Stoped
124            self.stateChanged.emit(SignalManager.Stoped)
125
126    def pause(self):
127        """
128        Pause the updates.
129
130        """
131        if self.__state != SignalManager.Paused:
132            self.__state = SignalManager.Paused
133            self.stateChanged.emit(SignalManager.Paused)
134
135    def resume(self):
136        if self.__state == SignalManager.Paused:
137            self.__state = SignalManager.Running
138            self.stateChanged.emit(self.__state)
139            self._update()
140
141    def step(self):
142        if self.__state == SignalManager.Paused:
143            self.process_queued(1)
144
145    def state(self):
146        """
147        Return the current state.
148        """
149        return self.__state
150
151    def _set_runtime_state(self, state):
152        """
153        Set the runtime state.
154
155        Should only be called by `SignalManager` implementations.
156
157        """
158        if self.__runtime_state != state:
159            self.__runtime_state = state
160            self.runtimeStateChanged.emit(self.__runtime_state)
161
162    def runtime_state(self):
163        """
164        Return the runtime state. This can be `SignalManager.Waiting`
165        or `SignalManager.Processing`.
166
167        """
168        return self.__runtime_state
169
170    def on_node_removed(self, node):
171        # remove all pending input signals for node so we don't get
172        # stale references in process_node.
173        # NOTE: This does not remove output signals for this node. In
174        # particular the final 'None' will be delivered to the sink
175        # nodes even after the source node is no longer in the scheme.
176        log.info("Node %r removed. Removing pending signals.",
177                 node.title)
178        self.remove_pending_signals(node)
179
180        del self._node_outputs[node]
181
182    def on_node_added(self, node):
183        self._node_outputs[node] = defaultdict(dict)
184
185    def link_added(self, link):
186        # push all current source values to the sink
187        if link.enabled:
188            log.info("Link added (%s). Scheduling signal data update.", link)
189            self._schedule(self.signals_on_link(link))
190            self._update()
191
192        link.enabled_changed.connect(self.link_enabled_changed)
193
194    def link_removed(self, link):
195        # purge all values in sink's queue
196        log.info("Link removed (%s). Scheduling signal data purge.", link)
197        self.purge_link(link)
198
199    def link_enabled_changed(self, enabled):
200        if enabled:
201            link = self.sender()
202            log.info("Link %s enabled. Scheduling signal data update.", link)
203            self._schedule(self.signals_on_link(link))
204
205    def signals_on_link(self, link):
206        """
207        Return _Signal instances representing the current values
208        present on the link.
209
210        """
211        items = self.link_contents(link)
212        signals = []
213
214        for key, value in items.items():
215            signals.append(_Signal(link, value, key))
216
217        return signals
218
219    def link_contents(self, link):
220        """
221        Return the contents on link.
222        """
223        node, channel = link.source_node, link.source_channel
224
225        return self._node_outputs[node][channel]
226
227    def send(self, node, channel, value, id):
228        """
229        """
230        log.debug("%r sending %r (id: %r) on channel %r",
231                  node.title, type(value), id, channel.name)
232
233        scheme = self.scheme()
234
235        self._node_outputs[node][channel][id] = value
236
237        links = scheme.find_links(source_node=node, source_channel=channel)
238        links = filter(is_enabled, links)
239
240        signals = []
241        for link in links:
242            signals.append(_Signal(link, value, id))
243
244        self._schedule(signals)
245
246    def purge_link(self, link):
247        """
248        Purge the link (send None for all ids currently present)
249        """
250        contents = self.link_contents(link)
251        ids = contents.keys()
252        signals = [_Signal(link, None, id) for id in ids]
253
254        self._schedule(signals)
255
256    def _schedule(self, signals):
257        """
258        Schedule a list of :class:`_Signal` for delivery.
259        """
260        self._input_queue.extend(signals)
261
262        if signals:
263            self.updatesPending.emit()
264
265        self._update()
266
267    def _update_links(self, source_node=None, source_channel=None,
268                      sink_node=None, sink_channel=None):
269        """
270        Schedule update of all enabled links matching the query.
271
272        See :ref:`Scheme.find_links` for description of parameters.
273
274        """
275        links = self.scheme().find_links(source_node=source_node,
276                                         source_channel=source_channel,
277                                         sink_node=sink_node,
278                                         sink_channel=sink_channel)
279        links = filter(is_enabled, links)
280
281        signals = reduce(add, self.signals_on_link, [])
282        self._schedule(signals)
283
284    def _update_link(self, link):
285        """
286        Schedule update of a single link.
287        """
288        signals = self.signals_on_link(link)
289        self._schedule(signals)
290
291    def process_queued(self, max_nodes=None):
292        """
293        Process queued signals.
294        """
295        if self.__runtime_state == SignalManager.Processing:
296            raise RuntimeError("Cannot re-enter 'process_queued'")
297
298        if not self._can_process():
299            raise RuntimeError("Can't process in state %i" % self.__state)
300
301        log.info("Processing queued signals")
302
303        node_update_front = self.node_update_front()
304
305        if max_nodes is not None:
306            node_update_front = node_update_front[:max_nodes]
307
308        log.debug("Nodes for update %s",
309                  [node.title for node in node_update_front])
310
311        self._set_runtime_state(SignalManager.Processing)
312        try:
313            # TODO: What if the update front changes in the loop?
314            for node in node_update_front:
315                self.process_node(node)
316        finally:
317            self._set_runtime_state(SignalManager.Waiting)
318
319    def process_node(self, node):
320        """
321        Process pending input signals for `node`.
322        """
323        signals_in = self.pending_input_signals(node)
324        self.remove_pending_signals(node)
325
326        signals_in = self.compress_signals(signals_in)
327
328        log.debug("Processing %r, sending %i signals.",
329                  node.title, len(signals_in))
330
331        self.processingStarted.emit()
332        self.processingStarted[SchemeNode].emit(node)
333        try:
334            self.send_to_node(node, signals_in)
335        finally:
336            self.processingFinished.emit()
337            self.processingFinished[SchemeNode].emit(node)
338
339    def compress_signals(self, signals):
340        """
341        Compress a list of :class:`_Signal` instances to be delivered.
342
343        The base implementation returns the list unmodified.
344
345        """
346        return signals
347
348    def send_to_node(self, node, signals):
349        """
350        Abstract. Reimplement in subclass.
351
352        Send/notify the :class:`SchemeNode` instance (or whatever
353        object/instance it is a representation of) that it has new inputs
354        as represented by the signals list (list of :class:`_Signal`).
355
356        """
357        raise NotImplementedError
358
359    def is_pending(self, node):
360        """
361        Is `node` (class:`SchemeNode`) scheduled for processing (i.e.
362        it has incoming pending signals).
363
364        """
365        return node in [signal.link.sink_node for signal in self._input_queue]
366
367    def pending_nodes(self):
368        """
369        Return a list of pending nodes (in no particular order).
370        """
371        return list(set(sig.link.sink_node for sig in self._input_queue))
372
373    def pending_input_signals(self, node):
374        """
375        Return a list of pending input signals for node.
376        """
377        return [signal for signal in self._input_queue
378                if node is signal.link.sink_node]
379
380    def remove_pending_signals(self, node):
381        """
382        Remove pending signals for `node`.
383        """
384        for signal in self.pending_input_signals(node):
385            try:
386                self._input_queue.remove(signal)
387            except ValueError:
388                pass
389
390    def blocking_nodes(self):
391        """
392        Return a list of nodes in a blocking state.
393        """
394        scheme = self.scheme()
395        return [node for node in scheme.nodes if self.is_blocking(node)]
396
397    def is_blocking(self, node):
398        return False
399
400    def node_update_front(self):
401        """
402        Return a list of nodes on the update front, i.e. nodes scheduled for
403        an update that have no ancestor which is either itself scheduled
404        for update or is in a blocking state)
405
406        .. note::
407            The node's ancestors are only computed over enabled links.
408
409        """
410        scheme = self.scheme()
411
412        blocking_nodes = set(self.blocking_nodes())
413
414        dependents = partial(dependent_nodes, scheme)
415
416        blocked_nodes = reduce(set.union,
417                               map(dependents, blocking_nodes),
418                               set(blocking_nodes))
419
420        pending = set(self.pending_nodes())
421
422        pending_downstream = reduce(set.union,
423                                    map(dependents, pending),
424                                    set())
425
426        log.debug("Pending nodes: %s", pending)
427        log.debug("Blocking nodes: %s", blocking_nodes)
428
429        return list(pending - pending_downstream - blocked_nodes)
430
431    def event(self, event):
432        if event.type() == QEvent.UpdateRequest:
433            if not self.__state == SignalManager.Running:
434                log.debug("Received 'UpdateRequest' event while not "
435                          "in 'Running' state")
436                event.setAccepted(False)
437                return False
438
439            if self.__runtime_state == SignalManager.Processing:
440                log.debug("Received 'UpdateRequest' event while in "
441                          "'process_queued'")
442                # This happens if someone calls QCoreApplication.processEvents
443                # from the signal handlers.
444                self.__reschedule = True
445                event.accept()
446                return True
447
448            log.info("'UpdateRequest' event, queued signals: %i",
449                      len(self._input_queue))
450            if self._input_queue:
451                self.process_queued(max_nodes=1)
452            event.accept()
453
454            if self.__reschedule:
455                log.debug("Rescheduling 'UpdateRequest' event")
456                self._update()
457                self.__reschedule = False
458            elif self.node_update_front():
459                log.debug("More nodes are eligible for an update. "
460                          "Scheduling another update.")
461                self._update()
462
463            return True
464
465        return QObject.event(self, event)
466
467    def _update(self):
468        """
469        Schedule processing at a later time.
470        """
471        QCoreApplication.postEvent(self, QEvent(QEvent.UpdateRequest))
472
473
474def can_enable_dynamic(link, value):
475    """
476    Can the a dynamic `link` (:class:`SchemeLink`) be enabled for`value`.
477    """
478    return isinstance(value, link.sink_type())
479
480
481def compress_signals(signals):
482    """
483    Compress a list of signals.
484    """
485    groups = group_by_all(reversed(signals),
486                          key=lambda sig: (sig.link, sig.id))
487    signals = []
488
489    def has_none(signals):
490        return any(sig.value is None for sig in signals)
491
492    for (link, id), signals_grouped in groups:
493        if len(signals_grouped) > 1 and has_none(signals_grouped[1:]):
494            signals.append(signals_grouped[0])
495            signals.append(_Signal(link, None, id))
496        else:
497            signals.append(signals_grouped[0])
498
499    return list(reversed(signals))
500
501
502def dependent_nodes(scheme, node):
503    """
504    Return a list of all nodes (in breadth first order) in `scheme` that
505    are dependent on `node`,
506
507    .. note::
508        This does not include nodes only reachable by disables links.
509
510    """
511    def expand(node):
512        return [link.sink_node
513                for link in scheme.find_links(source_node=node)
514                if link.enabled]
515
516    nodes = list(traverse_bf(node, expand))
517    assert nodes[0] is node
518    # Remove the first item (`node`).
519    return nodes[1:]
520
521
522def traverse_bf(start, expand):
523    queue = deque([start])
524    visited = set()
525    while queue:
526        item = queue.popleft()
527        if item not in visited:
528            yield item
529            visited.add(item)
530            queue.extend(expand(item))
531
532
533def group_by_all(sequence, key=None):
534    order_seen = []
535    groups = {}
536    for item in sequence:
537        if key is not None:
538            item_key = key(item)
539        else:
540            item_key = item
541        if item_key in groups:
542            groups[item_key].append(item)
543        else:
544            groups[item_key] = [item]
545            order_seen.append(item_key)
546
547    return [(key, groups[key]) for key in order_seen]
Note: See TracBrowser for help on using the repository browser.