source: orange/Orange/OrangeCanvas/scheme/signalmanager.py @ 11269:b9a89af169b2

Revision 11269:b9a89af169b2, 14.4 KB checked in by Ales Erjavec <ales.erjavec@…>, 15 months ago (diff)

Replaced old orngSignalManager.SignalManager class.

Line 
1"""
2Signal Manager
3==============
4
5A SignalManager instance handles the runtime signal propagation between
6widgets in a scheme.
7
8
9"""
10
11import logging
12
13from collections import namedtuple, defaultdict
14from operator import attrgetter, add
15
16
17from PyQt4.QtCore import QObject, QCoreApplication, QEvent
18from PyQt4.QtCore import pyqtSignal as Signal
19
20
21from .scheme import SchemeNode
22
23log = logging.getLogger(__name__)
24
25
26_Signal = namedtuple(
27    "_Signal",
28    ["link",     # link on which the signal is sent
29     "value",    # signal value
30     "id"])      # signal id
31
32
33is_enabled = attrgetter("enabled")
34
35
36class SignalManager(QObject):
37    """
38    Handle all runtime signal propagation for a :clas:`Scheme` instance.
39    The scheme must be passed to the constructor and will become the parent
40    of this object. Furthermore this should happen before any items
41    (nodes, links) are added to the scheme.
42
43    """
44    Running, Stoped, Paused, Error = range(4)
45    """SignalManger state flags."""
46
47    Waiting, Processing = range(2)
48    """SignalManager runtime state flags."""
49
50    stateChanged = Signal(int)
51    """Emitted when the state of the signal manager changes."""
52
53    updatesPending = Signal()
54    """Emitted when signals are added to the queue."""
55
56    processingStarted = Signal([], [SchemeNode])
57    """Emitted right before a `SchemeNode` instance has its inputs
58    updated.
59    """
60
61    processingFinished = Signal([], [SchemeNode])
62    """Emitted right after a `SchemeNode` instance has had its inputs
63    updated.
64    """
65
66    runtimeStateChanged = Signal(int)
67    """Emitted when `SignalManager`'s runtime state changes."""
68
69    def __init__(self, scheme):
70        assert(scheme)
71        QObject.__init__(self, scheme)
72        self._input_queue = []
73
74        # mapping a node to it's current outputs
75        # {node: {channel: {id: signal_value}}}
76        self._node_outputs = {}
77
78        self.__state = SignalManager.Running
79        self.__runtime_state = SignalManager.Waiting
80
81        # A flag indicating if UpdateRequest event should be rescheduled
82        self.__reschedule = False
83
84    def _can_process(self):
85        """
86        Return a bool indicating if the manger can enter the main
87        processing loop.
88
89        """
90        return self.__state not in [SignalManager.Error, SignalManager.Stoped]
91
92    def scheme(self):
93        """
94        Return the parent class:`Scheme` instance.
95        """
96        return self.parent()
97
98    def start(self):
99        """
100        Start the update loop.
101
102        .. note:: The updates will not happen until the control reaches
103                  the Qt event loop.
104
105        """
106        if self.__state != SignalManager.Running:
107            self.__state = SignalManager.Running
108            self.stateChanged.emit(SignalManager.Running)
109            self._update()
110
111    def stop(self):
112        """
113        Stop the update loop.
114
115        .. note:: If the `SignalManager` is currently in `process_queues` it
116                  will still update all current pending signals, but will not
117                  re-enter until `start()` is called again
118
119        """
120        if self.__state != SignalManager.Stoped:
121            self.__state = SignalManager.Stoped
122            self.stateChanged.emit(SignalManager.Stoped)
123
124    def pause(self):
125        """
126        Pause the updates.
127
128        """
129        if self.__state != SignalManager.Paused:
130            self.__state = SignalManager.Paused
131            self.stateChanged.emit(SignalManager.Paused)
132
133    def resume(self):
134        if self.__state == SignalManager.Paused:
135            self.__state = SignalManager.Running
136            self.stateChanged.emit(self.__state)
137            self._update()
138
139    def step(self):
140        if self.__state == SignalManager.Paused:
141            self.process_queued(1)
142
143    def state(self):
144        """
145        Return the current state.
146        """
147        return self.__state
148
149    def _set_runtime_state(self, state):
150        """
151        Set the runtime state.
152
153        Should only be called by `SignalManager` implementations.
154
155        """
156        if self.__runtime_state != state:
157            self.__runtime_state = state
158            self.runtimeStateChanged.emit(self.__runtime_state)
159
160    def runtime_state(self):
161        """
162        Return the runtime state. This can be `SignalManager.Waiting`
163        or `SignalManager.Processing`.
164
165        """
166        return self.__runtime_state
167
168    def on_node_removed(self, node):
169        # remove all pending input signals for node so we don't get stale
170        # references in process_node
171        # NOTE: This does not remove output signals this node. In particular
172        # the final 'None' values might be left on the queue.
173        log.info("Node %r removed. Removing pending signals.")
174        self.remove_pending_signals(node)
175
176        del self._node_outputs[node]
177
178    def on_node_added(self, node):
179        self._node_outputs[node] = defaultdict(dict)
180
181    def link_added(self, link):
182        # push all current source values to the sink
183        if link.enabled:
184            log.info("Link added (%s). Scheduling signal data update.", link)
185            self._schedule(self.signals_on_link(link))
186            self._update()
187
188        link.enabled_changed.connect(self.link_enabled_changed)
189
190    def link_removed(self, link):
191        # purge all values in sink's queue
192        log.info("Link removed (%s). Scheduling signal data purge.", link)
193        self.purge_link(link)
194
195    def link_enabled_changed(self, enabled):
196        if enabled:
197            link = self.sender()
198            log.info("Link %s enabled. Scheduling signal data update.", link)
199            self._schedule(self.signals_on_link(link))
200
201    def signals_on_link(self, link):
202        """
203        Return _Signal instances representing the current values
204        present on the link.
205
206        """
207        items = self.link_contents(link)
208        signals = []
209
210        for key, value in items.items():
211            signals.append(_Signal(link, value, key))
212
213        return signals
214
215    def link_contents(self, link):
216        """
217        Return the contents on link.
218        """
219        node, channel = link.source_node, link.source_channel
220
221        return self._node_outputs[node][channel]
222
223    def send(self, node, channel, value, id):
224        """
225        """
226        log.debug("%r sending %r (id: %r) on channel %r",
227                  node.title, type(value), id, channel.name)
228
229        scheme = self.scheme()
230
231        self._node_outputs[node][channel][id] = value
232
233        links = scheme.find_links(source_node=node, source_channel=channel)
234        links = filter(is_enabled, links)
235
236        signals = []
237        for link in links:
238            signals.append(_Signal(link, value, id))
239
240        self._schedule(signals)
241
242    def purge_link(self, link):
243        """
244        Purge the link (send None for all ids currently present)
245        """
246        contents = self.link_contents(link)
247        ids = contents.keys()
248        signals = [_Signal(link, None, id) for id in ids]
249
250        self._schedule(signals)
251
252    def _schedule(self, signals):
253        """
254        Schedule a list of :class:`_Signal` for delivery.
255        """
256        self._input_queue.extend(signals)
257
258        if signals:
259            self.updatesPending.emit()
260
261        self._update()
262
263    def _update_links(self, source_node=None, source_channel=None,
264                      sink_node=None, sink_channel=None):
265        """
266        Schedule update of all enabled links matching the query.
267
268        See :ref:`Scheme.find_links` for description of parameters.
269
270        """
271        links = self.scheme().find_links(source_node=source_node,
272                                         source_channel=source_channel,
273                                         sink_node=sink_node,
274                                         sink_channel=sink_channel)
275        links = filter(is_enabled, links)
276
277        signals = reduce(add, self.signals_on_link, [])
278        self._schedule(signals)
279
280    def _update_link(self, link):
281        """
282        Schedule update of a single link.
283        """
284        signals = self.signals_on_link(link)
285        self._schedule(signals)
286
287    def process_queued(self, max_nodes=None):
288        """
289        Process queued signals.
290        """
291        if self.__runtime_state == SignalManager.Processing:
292            raise RuntimeError("Cannot re-enter 'process_queued'")
293
294        if not self._can_process():
295            raise RuntimeError("Can't process in state %i" % self.__state)
296
297        log.info("Processing queued signals")
298        scheme = self.scheme()
299
300        blocking_nodes = set(self.blocking_nodes())
301
302        blocked_nodes = reduce(set.union,
303                               map(scheme.downstream_nodes, blocking_nodes),
304                               set(blocking_nodes))
305
306        pending = set(self.pending_nodes())
307
308        pending_downstream = reduce(set.union,
309                                    map(scheme.downstream_nodes, pending),
310                                    set())
311
312        log.debug("Pending nodes: %s", pending)
313        log.debug("Blocking nodes: %s", blocking_nodes)
314
315        # nodes on the update front (they have no ancestor which is
316        # either scheduled for update or is in blocking state)
317        node_update_front = list(pending - pending_downstream - blocked_nodes)
318
319        if max_nodes is not None:
320            node_update_front = node_update_front[:max_nodes]
321
322        log.debug("Nodes for update %s",
323                  [node.title for node in node_update_front])
324
325        self._set_runtime_state(SignalManager.Processing)
326        try:
327            for node in node_update_front:
328                self.process_node(node)
329        finally:
330            self._set_runtime_state(SignalManager.Waiting)
331
332    def process_node(self, node):
333        """
334        Process pending input signals for `node`.
335        """
336        signals_in = self.pending_input_signals(node)
337        self.remove_pending_signals(node)
338
339        signals_in = self.compress_signals(signals_in)
340
341        log.debug("Processing %r, sending %i signals.",
342                  node.title, len(signals_in))
343
344        self.processingStarted.emit()
345        self.processingStarted[SchemeNode].emit(node)
346        try:
347            self.send_to_node(node, signals_in)
348        finally:
349            self.processingFinished.emit()
350            self.processingFinished[SchemeNode].emit(node)
351
352    def compress_signals(self, signals):
353        """
354        Compress a list of :class:`_Signal` instances to be delivered.
355
356        The base implementation returns the list unmodified.
357
358        """
359        return signals
360
361    def send_to_node(self, node, signals):
362        """
363        Abstract. Reimplement in subclass.
364
365        Send/notify the :class:`SchemeNode` instance (or whatever
366        object/instance it is a representation of) that it has new inputs
367        as represented by the signals list (list of :class:`_Signal`).
368
369        """
370        raise NotImplementedError
371
372    def is_pending(self, node):
373        """
374        Is `node` (class:`SchemeNode`) scheduled for processing (i.e.
375        it has incoming pending signals).
376
377        """
378        return node in [signal.link.sink_node for signal in self._input_queue]
379
380    def pending_nodes(self):
381        """
382        Return a list of pending nodes (in no particular order).
383        """
384        return list(set(sig.link.sink_node for sig in self._input_queue))
385
386    def pending_input_signals(self, node):
387        """
388        Return a list of pending input signals for node.
389        """
390        return [signal for signal in self._input_queue
391                if node is signal.link.sink_node]
392
393    def remove_pending_signals(self, node):
394        """
395        Remove pending signals for `node`.
396        """
397        for signal in self.pending_input_signals(node):
398            try:
399                self._input_queue.remove(signal)
400            except ValueError:
401                pass
402
403    def blocking_nodes(self):
404        """
405        Return a list of nodes in a blocking state.
406        """
407        scheme = self.scheme()
408        return [node for node in scheme.nodes if self.is_blocking(node)]
409
410    def is_blocking(self, node):
411        return False
412
413    def event(self, event):
414        if event.type() == QEvent.UpdateRequest:
415            if not self.__state == SignalManager.Running:
416                log.debug("Received UpdateRequest event while not "
417                          "in 'Running' state")
418                event.setAccepted(False)
419                return False
420
421            if self.__runtime_state == SignalManager.Processing:
422                log.debug("received UpdateRequest event while in "
423                          "'process_queued'")
424                # This happens if someone calls QCoreApplication.processEvents
425                # from the signal handlers.
426                self.__reschedule = True
427                event.accept()
428                return True
429
430            log.debug("UpdateRequest event, queued signals: %i",
431                      len(self._input_queue))
432            if self._input_queue:
433                self.process_queued()
434            event.accept()
435
436            if self.__reschedule:
437                log.debug("Rescheduling UpdateRequest event")
438                self._update()
439                self.__reschedule = False
440
441            return True
442
443        return QObject.event(self, event)
444
445    def _update(self):
446        """
447        Schedule processing at a later time.
448        """
449        QCoreApplication.postEvent(self, QEvent(QEvent.UpdateRequest))
450
451
452def can_enable_dynamic(link, value):
453    """
454    Can the a dynamic `link` (:class:`SchemeLink`) be enabled for`value`.
455    """
456    return isinstance(value, link.sink_type())
457
458
459def compress_signals(signals):
460    """
461    Compress a list of signals.
462    """
463    groups = group_by_all(reversed(signals),
464                          key=lambda sig: (sig.link, sig.id))
465    signals = []
466
467    def has_none(signals):
468        return any(sig.value is None for sig in signals)
469
470    for (link, id), signals_grouped in groups:
471        if len(signals_grouped) > 1 and has_none(signals_grouped[1:]):
472            signals.append(signals_grouped[0])
473            signals.append(_Signal(link, None, id))
474        else:
475            signals.append(signals_grouped[0])
476
477    return list(reversed(signals))
478
479
480def group_by_all(sequence, key=None):
481    order_seen = []
482    groups = {}
483    for item in sequence:
484        if key is not None:
485            item_key = key(item)
486        else:
487            item_key = item
488        if item_key in groups:
489            groups[item_key].append(item)
490        else:
491            groups[item_key] = [item]
492            order_seen.append(item_key)
493
494    return [(key, groups[key]) for key in order_seen]
Note: See TracBrowser for help on using the repository browser.