source: orange/Orange/OrangeCanvas/scheme/signalmanager.py @ 11465:5c3834e1d2ea

Revision 11465:5c3834e1d2ea, 14.4 KB checked in by Ales Erjavec <ales.erjavec@…>, 12 months ago (diff)

Delay the deletion of the Scheme and/or OWBaseWidget until SignalManager finishes the current update.

Line 
1"""
2Signal Manager
3==============
4
5A SignalManager instance handles the runtime signal propagation between
6widgets in a scheme.
7
8
9"""
10
11import logging
12
13from collections import namedtuple, defaultdict
14from operator import attrgetter, add
15
16
17from PyQt4.QtCore import QObject, QCoreApplication, QEvent
18from PyQt4.QtCore import pyqtSignal as Signal
19
20
21from .scheme import SchemeNode
22
23log = logging.getLogger(__name__)
24
25
26_Signal = namedtuple(
27    "_Signal",
28    ["link",     # link on which the signal is sent
29     "value",    # signal value
30     "id"])      # signal id
31
32
33is_enabled = attrgetter("enabled")
34
35
36class SignalManager(QObject):
37    """
38    Handle all runtime signal propagation for a :clas:`Scheme` instance.
39    The scheme must be passed to the constructor and will become the parent
40    of this object. Furthermore this should happen before any items
41    (nodes, links) are added to the scheme.
42
43    """
44    Running, Stoped, Paused, Error = range(4)
45    """SignalManger state flags."""
46
47    Waiting, Processing = range(2)
48    """SignalManager runtime state flags."""
49
50    stateChanged = Signal(int)
51    """Emitted when the state of the signal manager changes."""
52
53    updatesPending = Signal()
54    """Emitted when signals are added to the queue."""
55
56    processingStarted = Signal([], [SchemeNode])
57    """Emitted right before a `SchemeNode` instance has its inputs
58    updated.
59    """
60
61    processingFinished = Signal([], [SchemeNode])
62    """Emitted right after a `SchemeNode` instance has had its inputs
63    updated.
64    """
65
66    runtimeStateChanged = Signal(int)
67    """Emitted when `SignalManager`'s runtime state changes."""
68
69    def __init__(self, scheme):
70        assert(scheme)
71        QObject.__init__(self, scheme)
72        self._input_queue = []
73
74        # mapping a node to it's current outputs
75        # {node: {channel: {id: signal_value}}}
76        self._node_outputs = {}
77
78        self.__state = SignalManager.Running
79        self.__runtime_state = SignalManager.Waiting
80
81        # A flag indicating if UpdateRequest event should be rescheduled
82        self.__reschedule = False
83
84    def _can_process(self):
85        """
86        Return a bool indicating if the manger can enter the main
87        processing loop.
88
89        """
90        return self.__state not in [SignalManager.Error, SignalManager.Stoped]
91
92    def scheme(self):
93        """
94        Return the parent class:`Scheme` instance.
95        """
96        return self.parent()
97
98    def start(self):
99        """
100        Start the update loop.
101
102        .. note:: The updates will not happen until the control reaches
103                  the Qt event loop.
104
105        """
106        if self.__state != SignalManager.Running:
107            self.__state = SignalManager.Running
108            self.stateChanged.emit(SignalManager.Running)
109            self._update()
110
111    def stop(self):
112        """
113        Stop the update loop.
114
115        .. note:: If the `SignalManager` is currently in `process_queues` it
116                  will still update all current pending signals, but will not
117                  re-enter until `start()` is called again
118
119        """
120        if self.__state != SignalManager.Stoped:
121            self.__state = SignalManager.Stoped
122            self.stateChanged.emit(SignalManager.Stoped)
123
124    def pause(self):
125        """
126        Pause the updates.
127
128        """
129        if self.__state != SignalManager.Paused:
130            self.__state = SignalManager.Paused
131            self.stateChanged.emit(SignalManager.Paused)
132
133    def resume(self):
134        if self.__state == SignalManager.Paused:
135            self.__state = SignalManager.Running
136            self.stateChanged.emit(self.__state)
137            self._update()
138
139    def step(self):
140        if self.__state == SignalManager.Paused:
141            self.process_queued(1)
142
143    def state(self):
144        """
145        Return the current state.
146        """
147        return self.__state
148
149    def _set_runtime_state(self, state):
150        """
151        Set the runtime state.
152
153        Should only be called by `SignalManager` implementations.
154
155        """
156        if self.__runtime_state != state:
157            self.__runtime_state = state
158            self.runtimeStateChanged.emit(self.__runtime_state)
159
160    def runtime_state(self):
161        """
162        Return the runtime state. This can be `SignalManager.Waiting`
163        or `SignalManager.Processing`.
164
165        """
166        return self.__runtime_state
167
168    def on_node_removed(self, node):
169        # remove all pending input signals for node so we don't get stale
170        # references in process_node
171        # NOTE: This does not remove output signals this node. In particular
172        # the final 'None' values might be left on the queue.
173        log.info("Node %r removed. Removing pending signals.",
174                 node.title)
175        self.remove_pending_signals(node)
176
177        del self._node_outputs[node]
178
179    def on_node_added(self, node):
180        self._node_outputs[node] = defaultdict(dict)
181
182    def link_added(self, link):
183        # push all current source values to the sink
184        if link.enabled:
185            log.info("Link added (%s). Scheduling signal data update.", link)
186            self._schedule(self.signals_on_link(link))
187            self._update()
188
189        link.enabled_changed.connect(self.link_enabled_changed)
190
191    def link_removed(self, link):
192        # purge all values in sink's queue
193        log.info("Link removed (%s). Scheduling signal data purge.", link)
194        self.purge_link(link)
195
196    def link_enabled_changed(self, enabled):
197        if enabled:
198            link = self.sender()
199            log.info("Link %s enabled. Scheduling signal data update.", link)
200            self._schedule(self.signals_on_link(link))
201
202    def signals_on_link(self, link):
203        """
204        Return _Signal instances representing the current values
205        present on the link.
206
207        """
208        items = self.link_contents(link)
209        signals = []
210
211        for key, value in items.items():
212            signals.append(_Signal(link, value, key))
213
214        return signals
215
216    def link_contents(self, link):
217        """
218        Return the contents on link.
219        """
220        node, channel = link.source_node, link.source_channel
221
222        return self._node_outputs[node][channel]
223
224    def send(self, node, channel, value, id):
225        """
226        """
227        log.debug("%r sending %r (id: %r) on channel %r",
228                  node.title, type(value), id, channel.name)
229
230        scheme = self.scheme()
231
232        self._node_outputs[node][channel][id] = value
233
234        links = scheme.find_links(source_node=node, source_channel=channel)
235        links = filter(is_enabled, links)
236
237        signals = []
238        for link in links:
239            signals.append(_Signal(link, value, id))
240
241        self._schedule(signals)
242
243    def purge_link(self, link):
244        """
245        Purge the link (send None for all ids currently present)
246        """
247        contents = self.link_contents(link)
248        ids = contents.keys()
249        signals = [_Signal(link, None, id) for id in ids]
250
251        self._schedule(signals)
252
253    def _schedule(self, signals):
254        """
255        Schedule a list of :class:`_Signal` for delivery.
256        """
257        self._input_queue.extend(signals)
258
259        if signals:
260            self.updatesPending.emit()
261
262        self._update()
263
264    def _update_links(self, source_node=None, source_channel=None,
265                      sink_node=None, sink_channel=None):
266        """
267        Schedule update of all enabled links matching the query.
268
269        See :ref:`Scheme.find_links` for description of parameters.
270
271        """
272        links = self.scheme().find_links(source_node=source_node,
273                                         source_channel=source_channel,
274                                         sink_node=sink_node,
275                                         sink_channel=sink_channel)
276        links = filter(is_enabled, links)
277
278        signals = reduce(add, self.signals_on_link, [])
279        self._schedule(signals)
280
281    def _update_link(self, link):
282        """
283        Schedule update of a single link.
284        """
285        signals = self.signals_on_link(link)
286        self._schedule(signals)
287
288    def process_queued(self, max_nodes=None):
289        """
290        Process queued signals.
291        """
292        if self.__runtime_state == SignalManager.Processing:
293            raise RuntimeError("Cannot re-enter 'process_queued'")
294
295        if not self._can_process():
296            raise RuntimeError("Can't process in state %i" % self.__state)
297
298        log.info("Processing queued signals")
299        scheme = self.scheme()
300
301        blocking_nodes = set(self.blocking_nodes())
302
303        blocked_nodes = reduce(set.union,
304                               map(scheme.downstream_nodes, blocking_nodes),
305                               set(blocking_nodes))
306
307        pending = set(self.pending_nodes())
308
309        pending_downstream = reduce(set.union,
310                                    map(scheme.downstream_nodes, pending),
311                                    set())
312
313        log.debug("Pending nodes: %s", pending)
314        log.debug("Blocking nodes: %s", blocking_nodes)
315
316        # nodes on the update front (they have no ancestor which is
317        # either scheduled for update or is in blocking state)
318        node_update_front = list(pending - pending_downstream - blocked_nodes)
319
320        if max_nodes is not None:
321            node_update_front = node_update_front[:max_nodes]
322
323        log.debug("Nodes for update %s",
324                  [node.title for node in node_update_front])
325
326        self._set_runtime_state(SignalManager.Processing)
327        try:
328            for node in node_update_front:
329                self.process_node(node)
330        finally:
331            self._set_runtime_state(SignalManager.Waiting)
332
333    def process_node(self, node):
334        """
335        Process pending input signals for `node`.
336        """
337        signals_in = self.pending_input_signals(node)
338        self.remove_pending_signals(node)
339
340        signals_in = self.compress_signals(signals_in)
341
342        log.debug("Processing %r, sending %i signals.",
343                  node.title, len(signals_in))
344
345        self.processingStarted.emit()
346        self.processingStarted[SchemeNode].emit(node)
347        try:
348            self.send_to_node(node, signals_in)
349        finally:
350            self.processingFinished.emit()
351            self.processingFinished[SchemeNode].emit(node)
352
353    def compress_signals(self, signals):
354        """
355        Compress a list of :class:`_Signal` instances to be delivered.
356
357        The base implementation returns the list unmodified.
358
359        """
360        return signals
361
362    def send_to_node(self, node, signals):
363        """
364        Abstract. Reimplement in subclass.
365
366        Send/notify the :class:`SchemeNode` instance (or whatever
367        object/instance it is a representation of) that it has new inputs
368        as represented by the signals list (list of :class:`_Signal`).
369
370        """
371        raise NotImplementedError
372
373    def is_pending(self, node):
374        """
375        Is `node` (class:`SchemeNode`) scheduled for processing (i.e.
376        it has incoming pending signals).
377
378        """
379        return node in [signal.link.sink_node for signal in self._input_queue]
380
381    def pending_nodes(self):
382        """
383        Return a list of pending nodes (in no particular order).
384        """
385        return list(set(sig.link.sink_node for sig in self._input_queue))
386
387    def pending_input_signals(self, node):
388        """
389        Return a list of pending input signals for node.
390        """
391        return [signal for signal in self._input_queue
392                if node is signal.link.sink_node]
393
394    def remove_pending_signals(self, node):
395        """
396        Remove pending signals for `node`.
397        """
398        for signal in self.pending_input_signals(node):
399            try:
400                self._input_queue.remove(signal)
401            except ValueError:
402                pass
403
404    def blocking_nodes(self):
405        """
406        Return a list of nodes in a blocking state.
407        """
408        scheme = self.scheme()
409        return [node for node in scheme.nodes if self.is_blocking(node)]
410
411    def is_blocking(self, node):
412        return False
413
414    def event(self, event):
415        if event.type() == QEvent.UpdateRequest:
416            if not self.__state == SignalManager.Running:
417                log.debug("Received UpdateRequest event while not "
418                          "in 'Running' state")
419                event.setAccepted(False)
420                return False
421
422            if self.__runtime_state == SignalManager.Processing:
423                log.debug("received UpdateRequest event while in "
424                          "'process_queued'")
425                # This happens if someone calls QCoreApplication.processEvents
426                # from the signal handlers.
427                self.__reschedule = True
428                event.accept()
429                return True
430
431            log.debug("UpdateRequest event, queued signals: %i",
432                      len(self._input_queue))
433            if self._input_queue:
434                self.process_queued()
435            event.accept()
436
437            if self.__reschedule:
438                log.debug("Rescheduling UpdateRequest event")
439                self._update()
440                self.__reschedule = False
441
442            return True
443
444        return QObject.event(self, event)
445
446    def _update(self):
447        """
448        Schedule processing at a later time.
449        """
450        QCoreApplication.postEvent(self, QEvent(QEvent.UpdateRequest))
451
452
453def can_enable_dynamic(link, value):
454    """
455    Can the a dynamic `link` (:class:`SchemeLink`) be enabled for`value`.
456    """
457    return isinstance(value, link.sink_type())
458
459
460def compress_signals(signals):
461    """
462    Compress a list of signals.
463    """
464    groups = group_by_all(reversed(signals),
465                          key=lambda sig: (sig.link, sig.id))
466    signals = []
467
468    def has_none(signals):
469        return any(sig.value is None for sig in signals)
470
471    for (link, id), signals_grouped in groups:
472        if len(signals_grouped) > 1 and has_none(signals_grouped[1:]):
473            signals.append(signals_grouped[0])
474            signals.append(_Signal(link, None, id))
475        else:
476            signals.append(signals_grouped[0])
477
478    return list(reversed(signals))
479
480
481def group_by_all(sequence, key=None):
482    order_seen = []
483    groups = {}
484    for item in sequence:
485        if key is not None:
486            item_key = key(item)
487        else:
488            item_key = item
489        if item_key in groups:
490            groups[item_key].append(item)
491        else:
492            groups[item_key] = [item]
493            order_seen.append(item_key)
494
495    return [(key, groups[key]) for key in order_seen]
Note: See TracBrowser for help on using the repository browser.