Ignore:
Timestamp:
04/22/13 16:07:44 (12 months ago)
Author:
Ales Erjavec <ales.erjavec@…>
Branch:
default
Message:

Fix handling of 'blocking' nodes.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • Orange/OrangeCanvas/scheme/signalmanager.py

    r11465 r11466  
    1010 
    1111import logging 
    12  
    13 from collections import namedtuple, defaultdict 
     12import itertools 
     13 
     14from collections import namedtuple, defaultdict, deque 
    1415from operator import attrgetter, add 
     16from functools import partial 
    1517 
    1618 
     
    297299 
    298300        log.info("Processing queued signals") 
    299         scheme = self.scheme() 
    300  
    301         blocking_nodes = set(self.blocking_nodes()) 
    302  
    303         blocked_nodes = reduce(set.union, 
    304                                map(scheme.downstream_nodes, blocking_nodes), 
    305                                set(blocking_nodes)) 
    306  
    307         pending = set(self.pending_nodes()) 
    308  
    309         pending_downstream = reduce(set.union, 
    310                                     map(scheme.downstream_nodes, pending), 
    311                                     set()) 
    312  
    313         log.debug("Pending nodes: %s", pending) 
    314         log.debug("Blocking nodes: %s", blocking_nodes) 
    315  
    316         # nodes on the update front (they have no ancestor which is 
    317         # either scheduled for update or is in blocking state) 
    318         node_update_front = list(pending - pending_downstream - blocked_nodes) 
     301 
     302        node_update_front = self.node_update_front() 
    319303 
    320304        if max_nodes is not None: 
     
    326310        self._set_runtime_state(SignalManager.Processing) 
    327311        try: 
     312            # TODO: What if the update front changes in the loop? 
    328313            for node in node_update_front: 
    329314                self.process_node(node) 
     
    411396    def is_blocking(self, node): 
    412397        return False 
     398 
     399    def node_update_front(self): 
     400        """ 
     401        Return a list of nodes on the update front, i.e. nodes scheduled for 
     402        an update that have no ancestor which is either itself scheduled 
     403        for update or is in a blocking state) 
     404 
     405        .. note:: 
     406            The node's ancestors are only computed over enabled links. 
     407 
     408        """ 
     409        scheme = self.scheme() 
     410 
     411        blocking_nodes = set(self.blocking_nodes()) 
     412 
     413        dependents = partial(dependent_nodes, scheme) 
     414 
     415        blocked_nodes = reduce(set.union, 
     416                               map(dependents, blocking_nodes), 
     417                               set(blocking_nodes)) 
     418 
     419        pending = set(self.pending_nodes()) 
     420 
     421        pending_downstream = reduce(set.union, 
     422                                    map(dependents, pending), 
     423                                    set()) 
     424 
     425        log.debug("Pending nodes: %s", pending) 
     426        log.debug("Blocking nodes: %s", blocking_nodes) 
     427 
     428        return list(pending - pending_downstream - blocked_nodes) 
    413429 
    414430    def event(self, event): 
     
    477493 
    478494    return list(reversed(signals)) 
     495 
     496 
     497def dependent_nodes(scheme, node): 
     498    """ 
     499    Return a list of all nodes (in breadth first order) in `scheme` that 
     500    are dependent on `node`, 
     501 
     502    .. note:: 
     503        This does not include nodes only reachable by disables links. 
     504 
     505    """ 
     506    def expand(node): 
     507        return [link.sink_node 
     508                for link in scheme.find_links(source_node=node) 
     509                if link.enabled] 
     510 
     511    nodes = list(traverse_bf(node, expand)) 
     512    assert nodes[0] is node 
     513    # Remove the first item (`node`). 
     514    return nodes[1:] 
     515 
     516 
     517def traverse_bf(start, expand): 
     518    queue = deque([start]) 
     519    visited = set() 
     520    while queue: 
     521        item = queue.popleft() 
     522        if item not in visited: 
     523            yield item 
     524            visited.add(item) 
     525            queue.extend(expand(item)) 
    479526 
    480527 
Note: See TracChangeset for help on using the changeset viewer.