Changeset 11779:156bb31cf132 in orange


Ignore:
Timestamp:
12/02/13 15:14:55 (5 months ago)
Author:
Ales Erjavec <ales.erjavec@…>
Branch:
default
Message:

ThreadExecutor should only wait for futures submited through it.

This is because different executors can share the same thread pool.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • Orange/OrangeWidgets/OWConcurrent.py

    r11769 r11779  
    165165        self._threadPool = threadPool 
    166166        self._depot_thread = None 
     167        self._futures = [] 
     168        self._shutdown = False 
     169        self._state_lock = threading.Lock() 
    167170 
    168171    def _get_depot_thread(self): 
     
    179182 
    180183        """ 
    181         if isinstance(func, Task): 
    182             if func.thread() is not QThread.currentThread(): 
    183                 raise ValueError("Can only submit Tasks from it's own thread.") 
    184  
    185             if func.parent() is not None: 
    186                 raise ValueError("Can not submit Tasks with a parent.") 
    187  
    188             func.moveToThread(self._get_depot_thread()) 
    189             assert func.thread() is self._get_depot_thread() 
    190             # Use the Task's own Future object 
    191             f = func.future() 
    192             runnable = _TaskRunnable(f, func, args, kwargs) 
    193         else: 
    194             f = Future() 
    195             runnable = _Runnable(f, func, args, kwargs) 
    196         self._threadPool.start(runnable) 
    197  
    198         return f 
     184        with self._state_lock: 
     185            if self._shutdown: 
     186                raise RuntimeError("Cannot schedule new futures after " + 
     187                                   "shutdown.") 
     188 
     189            if isinstance(func, Task): 
     190                if func.thread() is not QThread.currentThread(): 
     191                    raise ValueError("Can only submit Tasks from it's own " + 
     192                                     "thread.") 
     193 
     194                if func.parent() is not None: 
     195                    raise ValueError("Can not submit Tasks with a parent.") 
     196 
     197                func.moveToThread(self._get_depot_thread()) 
     198                assert func.thread() is self._get_depot_thread() 
     199                # Use the Task's own Future object 
     200                f = func.future() 
     201                runnable = _TaskRunnable(f, func, args, kwargs) 
     202            else: 
     203                f = Future() 
     204                runnable = _Runnable(f, func, args, kwargs) 
     205 
     206            self._futures.append(f) 
     207            f._watchers.append(self._future_state_change) 
     208            self._threadPool.start(runnable) 
     209            return f 
    199210 
    200211    def map(self, func, *iterables): 
     
    209220        wait until all pending futures are executed or cancelled. 
    210221        """ 
     222        with self._state_lock: 
     223            self._shutdown = True 
     224 
    211225        if self._depot_thread is not None: 
    212226            QMetaObject.invokeMethod( 
     
    214228 
    215229        if wait: 
    216             self._threadPool.waitForDone() 
    217             if self._depot_thread: 
     230            # Wait until all futures have completed. 
     231            for future in list(self._futures): 
     232                try: 
     233                    future.exception() 
     234                except (TimeoutError, CancelledError): 
     235                    pass 
     236 
     237            if self._depot_thread is not None: 
    218238                self._depot_thread.wait() 
    219239                self._depot_thread = None 
     240 
     241    def _future_state_change(self, future, state): 
     242        # Remove futures when finished. 
     243        if state == Future.Finished: 
     244            self._futures.remove(future) 
    220245 
    221246 
Note: See TracChangeset for help on using the changeset viewer.