Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • Orange/OrangeCanvas/scheme/signalmanager.py

    r11269 r11467  
    1010 
    1111import logging 
    12  
    13 from collections import namedtuple, defaultdict 
     12import itertools 
     13 
     14from collections import namedtuple, defaultdict, deque 
    1415from operator import attrgetter, add 
     16from functools import partial 
    1517 
    1618 
     
    171173        # NOTE: This does not remove output signals this node. In particular 
    172174        # the final 'None' values might be left on the queue. 
    173         log.info("Node %r removed. Removing pending signals.") 
     175        log.info("Node %r removed. Removing pending signals.", 
     176                 node.title) 
    174177        self.remove_pending_signals(node) 
    175178 
     
    296299 
    297300        log.info("Processing queued signals") 
    298         scheme = self.scheme() 
    299  
    300         blocking_nodes = set(self.blocking_nodes()) 
    301  
    302         blocked_nodes = reduce(set.union, 
    303                                map(scheme.downstream_nodes, blocking_nodes), 
    304                                set(blocking_nodes)) 
    305  
    306         pending = set(self.pending_nodes()) 
    307  
    308         pending_downstream = reduce(set.union, 
    309                                     map(scheme.downstream_nodes, pending), 
    310                                     set()) 
    311  
    312         log.debug("Pending nodes: %s", pending) 
    313         log.debug("Blocking nodes: %s", blocking_nodes) 
    314  
    315         # nodes on the update front (they have no ancestor which is 
    316         # either scheduled for update or is in blocking state) 
    317         node_update_front = list(pending - pending_downstream - blocked_nodes) 
     301 
     302        node_update_front = self.node_update_front() 
    318303 
    319304        if max_nodes is not None: 
     
    325310        self._set_runtime_state(SignalManager.Processing) 
    326311        try: 
     312            # TODO: What if the update front changes in the loop? 
    327313            for node in node_update_front: 
    328314                self.process_node(node) 
     
    411397        return False 
    412398 
     399    def node_update_front(self): 
     400        """ 
     401        Return a list of nodes on the update front, i.e. nodes scheduled for 
     402        an update that have no ancestor which is either itself scheduled 
     403        for update or is in a blocking state) 
     404 
     405        .. note:: 
     406            The node's ancestors are only computed over enabled links. 
     407 
     408        """ 
     409        scheme = self.scheme() 
     410 
     411        blocking_nodes = set(self.blocking_nodes()) 
     412 
     413        dependents = partial(dependent_nodes, scheme) 
     414 
     415        blocked_nodes = reduce(set.union, 
     416                               map(dependents, blocking_nodes), 
     417                               set(blocking_nodes)) 
     418 
     419        pending = set(self.pending_nodes()) 
     420 
     421        pending_downstream = reduce(set.union, 
     422                                    map(dependents, pending), 
     423                                    set()) 
     424 
     425        log.debug("Pending nodes: %s", pending) 
     426        log.debug("Blocking nodes: %s", blocking_nodes) 
     427 
     428        return list(pending - pending_downstream - blocked_nodes) 
     429 
    413430    def event(self, event): 
    414431        if event.type() == QEvent.UpdateRequest: 
    415432            if not self.__state == SignalManager.Running: 
    416                 log.debug("Received UpdateRequest event while not " 
     433                log.debug("Received 'UpdateRequest' event while not " 
    417434                          "in 'Running' state") 
    418435                event.setAccepted(False) 
     
    420437 
    421438            if self.__runtime_state == SignalManager.Processing: 
    422                 log.debug("received UpdateRequest event while in " 
     439                log.debug("Received 'UpdateRequest' event while in " 
    423440                          "'process_queued'") 
    424441                # This happens if someone calls QCoreApplication.processEvents 
     
    428445                return True 
    429446 
    430             log.debug("UpdateRequest event, queued signals: %i", 
     447            log.info("'UpdateRequest' event, queued signals: %i", 
    431448                      len(self._input_queue)) 
    432449            if self._input_queue: 
    433                 self.process_queued() 
     450                self.process_queued(max_nodes=1) 
    434451            event.accept() 
    435452 
    436453            if self.__reschedule: 
    437                 log.debug("Rescheduling UpdateRequest event") 
     454                log.debug("Rescheduling 'UpdateRequest' event") 
    438455                self._update() 
    439456                self.__reschedule = False 
     457            elif self.node_update_front(): 
     458                log.debug("More nodes are eligible for an update. " 
     459                          "Scheduling another update.") 
     460                self._update() 
    440461 
    441462            return True 
     
    476497 
    477498    return list(reversed(signals)) 
     499 
     500 
     501def dependent_nodes(scheme, node): 
     502    """ 
     503    Return a list of all nodes (in breadth first order) in `scheme` that 
     504    are dependent on `node`, 
     505 
     506    .. note:: 
     507        This does not include nodes only reachable by disables links. 
     508 
     509    """ 
     510    def expand(node): 
     511        return [link.sink_node 
     512                for link in scheme.find_links(source_node=node) 
     513                if link.enabled] 
     514 
     515    nodes = list(traverse_bf(node, expand)) 
     516    assert nodes[0] is node 
     517    # Remove the first item (`node`). 
     518    return nodes[1:] 
     519 
     520 
     521def traverse_bf(start, expand): 
     522    queue = deque([start]) 
     523    visited = set() 
     524    while queue: 
     525        item = queue.popleft() 
     526        if item not in visited: 
     527            yield item 
     528            visited.add(item) 
     529            queue.extend(expand(item)) 
    478530 
    479531 
Note: See TracChangeset for help on using the changeset viewer.