Changeset 11481:08b42fee6cf6 in orange


Ignore:
Timestamp:
04/30/13 10:52:46 (12 months ago)
Author:
Ales Erjavec <ales.erjavec@…>
Branch:
default
Message:

Extended the OWConcurrent module with 'concurrent.futures' like interface.

The old interface is now deprecated.

Location:
Orange/OrangeWidgets
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • Orange/OrangeWidgets/OWBaseWidget.py

    r11458 r11481  
    199199        widgetId += 1 
    200200        self.widgetId = widgetId 
    201          
    202         self._private_thread_pools = {} 
     201 
    203202        self.asyncCalls = [] 
    204203        self.asyncBlock = False 
  • Orange/OrangeWidgets/OWConcurrent.py

    r9671 r11481  
    88""" 
    99from __future__ import with_statement 
     10 
     11import sys 
     12import threading 
     13import logging 
     14 
    1015from functools import partial 
    11  
    12      
    13 from OWWidget import * 
     16from contextlib import contextmanager 
     17 
     18from PyQt4.QtGui import qApp 
     19 
     20from PyQt4.QtCore import ( 
     21    Qt, QObject, QMetaObject, QTimer, QThreadPool, QThread, QMutex, 
     22    QRunnable, QEventLoop, QCoreApplication, QEvent, QString, SIGNAL, 
     23    Q_ARG, pyqtSignature, 
     24) 
     25 
     26from PyQt4.QtCore import pyqtSignal as Signal, pyqtSlot as Slot 
     27 
     28_log = logging.getLogger(__name__) 
     29 
     30 
     31@contextmanager 
     32def locked(mutex): 
     33    """ 
     34    A context manager for locking an instance of a QMutex. 
     35    """ 
     36    mutex.lock() 
     37    try: 
     38        yield 
     39    finally: 
     40        mutex.unlock() 
     41 
     42 
     43class _TaskDepotThread(QThread): 
     44    """ 
     45    A special 'depot' thread used to transfer Task instance into threads 
     46    started by a QThreadPool. 
     47 
     48    """ 
     49    def start(self): 
     50        """ 
     51        Reimplemented from `QThread.start` 
     52        """ 
     53        QThread.start(self) 
     54        # Need to also handle method invoke from this thread 
     55        self.moveToThread(self) 
     56 
     57    def run(self): 
     58        """ 
     59        Reimplemented from `QThread.run` 
     60        """ 
     61        # Start the event loop. 
     62        # On some old Qt4/PyQt4 installations base QThread.run does not seem 
     63        # to enter the loop, despite being documented to do so. 
     64        self.exec_() 
     65 
     66    @Slot(object, object) 
     67    def transfer(self, obj, thread): 
     68        """ 
     69        Transfer `obj` (:class:`QObject`) instance from this thread to the 
     70        target `thread` (a :class:`QThread`). 
     71 
     72        """ 
     73        assert obj.thread() is self 
     74        assert QThread.currentThread() is self 
     75        obj.moveToThread(thread) 
     76 
     77 
     78class _TaskRunnable(QRunnable): 
     79    """ 
     80    A QRunnable for running a :class:`Task` by a :class:`ThreadExecuter`. 
     81    """ 
     82    def __init__(self, future, task, args, kwargs): 
     83        QRunnable.__init__(self) 
     84        self.future = future 
     85        self.task = task 
     86        self.args = args 
     87        self.kwargs = kwargs 
     88        self.eventLoop = None 
     89 
     90    def run(self): 
     91        """ 
     92        Reimplemented from `QRunnable.run` 
     93        """ 
     94        self.eventLoop = QEventLoop() 
     95        self.eventLoop.processEvents() 
     96 
     97        # Move the task to the current thread so it's events, signals, slots 
     98        # are triggered from this thread. 
     99        assert isinstance(self.task.thread(), _TaskDepotThread) 
     100        QMetaObject.invokeMethod( 
     101            self.task.thread(), "transfer", Qt.BlockingQueuedConnection, 
     102            Q_ARG(object, self.task), 
     103            Q_ARG(object, QThread.currentThread()) 
     104        ) 
     105 
     106        self.eventLoop.processEvents() 
     107 
     108        # Schedule task.run from the event loop. 
     109        self.task.start() 
     110 
     111        # Quit the loop and exit when task finishes or is cancelled. 
     112        # TODO: If the task encounters an critical error it might not emit 
     113        # these signals and this Runnable will never complete. 
     114        self.task.finished.connect(self.eventLoop.quit) 
     115        self.task.cancelled.connect(self.eventLoop.quit) 
     116        self.eventLoop.exec_() 
     117 
     118 
     119class _Runnable(QRunnable): 
     120    """ 
     121    A QRunnable for running plain functions by a :class:`ThreadExecuter`. 
     122    """ 
     123    def __init__(self, future, func, args, kwargs): 
     124        QRunnable.__init__(self) 
     125        self.future = future 
     126        self.func = func 
     127        self.args = args 
     128        self.kwargs = kwargs 
     129 
     130    def run(self): 
     131        """ 
     132        Reimplemented from QRunnable.run 
     133        """ 
     134        try: 
     135            if not self.future.set_running_or_notify_cancel(): 
     136                # Was cancelled 
     137                return 
     138            try: 
     139                result = self.func(*self.args, **self.kwargs) 
     140            except BaseException, ex: 
     141                self.future.set_exception(ex) 
     142            else: 
     143                self.future.set_result(result) 
     144        except BaseException: 
     145            _log.critical("Exception in worker thread.", exc_info=True) 
     146 
     147 
     148class ThreadExecutor(QObject): 
     149    """ 
     150    ThreadExceuter object class provides an interface for running tasks 
     151    in a thread pool. 
     152 
     153    :param QObject parent: 
     154        Executor's parent instance. 
     155 
     156    :param QThreadPool threadPool: 
     157        Thread pool to be used by the instance of the Executor. If `None` 
     158        then ``QThreadPool.globalInstance()`` will be used. 
     159 
     160    """ 
     161    def __init__(self, parent=None, threadPool=None): 
     162        QObject.__init__(self, parent) 
     163        if threadPool is None: 
     164            threadPool = QThreadPool.globalInstance() 
     165        self._threadPool = threadPool 
     166        self._depot_thread = None 
     167 
     168    def _get_depot_thread(self): 
     169        if self._depot_thread is None: 
     170            self._depot_thread = _TaskDepotThread() 
     171            self._depot_thread.start() 
     172 
     173        return self._depot_thread 
     174 
     175    def submit(self, func, *args, **kwargs): 
     176        """ 
     177        Schedule the `func(*args, **kwargs)` to be executed and return an 
     178        :class:`Future` instance representing the result of the computation. 
     179 
     180        """ 
     181        if isinstance(func, Task): 
     182            if func.thread() is not QThread.currentThread(): 
     183                raise ValueError("Can only submit Tasks from it's own thread.") 
     184 
     185            if func.parent() is not None: 
     186                raise ValueError("Can not submit Tasks with a parent.") 
     187 
     188            func.moveToThread(self._get_depot_thread()) 
     189            assert func.thread() is self._get_depot_thread() 
     190            # Use the Task's own Future object 
     191            f = func.future() 
     192            runnable = _TaskRunnable(f, func, args, kwargs) 
     193        else: 
     194            f = Future() 
     195            runnable = _Runnable(f, func, args, kwargs) 
     196        self._threadPool.start(runnable) 
     197 
     198        return f 
     199 
     200    def map(self, func, *iterables): 
     201        futures = [self.submit(func, *args) for args in zip(*iterables)] 
     202 
     203        for f in futures: 
     204            yield f.result() 
     205 
     206    def shutdown(self, wait=True): 
     207        """ 
     208        Shutdown the executor and free all resources. If `wait` is True then 
     209        wait until all pending futures are executed or cancelled. 
     210        """ 
     211        if self._depot_thread is not None: 
     212            QMetaObject.invokeMethod( 
     213                self._depot_thread, "quit", Qt.AutoConnection) 
     214 
     215        if wait: 
     216            self._threadPool.waitForDone() 
     217            if self._depot_thread: 
     218                self._depot_thread.wait() 
     219                self._depot_thread = None 
     220 
     221 
     222class ExecuteCallEvent(QEvent): 
     223    """ 
     224    Represents an function call from the event loop (used by :class:`Task` 
     225    to schedule the :func:`Task.run` method to be invoked) 
     226 
     227    """ 
     228    ExecuteCall = QEvent.registerEventType() 
     229 
     230    def __init__(self): 
     231        QEvent.__init__(self, ExecuteCallEvent.ExecuteCall) 
     232 
     233 
     234class Task(QObject): 
     235    """ 
     236    """ 
     237    started = Signal() 
     238    finished = Signal() 
     239    cancelled = Signal() 
     240    resultReady = Signal(object) 
     241    exceptionReady = Signal(Exception) 
     242 
     243    def __init__(self, parent=None, function=None): 
     244        QObject.__init__(self, parent) 
     245        self.function = function 
     246 
     247        self._future = Future() 
     248 
     249    def run(self): 
     250        if self.function is None: 
     251            raise NotImplementedError 
     252        else: 
     253            return self.function() 
     254 
     255    def start(self): 
     256        QCoreApplication.postEvent(self, ExecuteCallEvent()) 
     257 
     258    def future(self): 
     259        return self._future 
     260 
     261    def result(self, timeout=None): 
     262        return self._future.result(timeout) 
     263 
     264    def _execute(self): 
     265        try: 
     266            if not self._future.set_running_or_notify_cancel(): 
     267                self.cancelled.emit() 
     268                return 
     269 
     270            self.started.emit() 
     271            try: 
     272                result = self.run() 
     273            except BaseException, ex: 
     274                self._future.set_exception(ex) 
     275                self.exceptionReady.emit(ex) 
     276            else: 
     277                self._future.set_result(result) 
     278                self.resultReady.emit(result) 
     279 
     280            self.finished.emit() 
     281        except BaseException: 
     282            _log.critical("Exception in Task", exc_info=True) 
     283 
     284    def customEvent(self, event): 
     285        if event.type() == ExecuteCallEvent.ExecuteCall: 
     286            self._execute() 
     287        else: 
     288            QObject.customEvent(self, event) 
     289 
     290 
     291def futures_iter(futures): 
     292    for f in futures: 
     293        yield f.result() 
     294 
     295 
     296class TimeoutError(Exception): 
     297    pass 
     298 
     299 
     300class CancelledError(Exception): 
     301    pass 
     302 
     303 
     304class Future(object): 
     305    """ 
     306    A :class:`Future` class represents a result of an asynchronous 
     307    computation. 
     308 
     309    """ 
     310    Pending, Canceled, Running, Finished = 1, 2, 4, 8 
     311 
     312    def __init__(self): 
     313        self._watchers = [] 
     314        self._state = Future.Pending 
     315        self._condition = threading.Condition() 
     316        self._result = None 
     317        self._exception = None 
     318 
     319    def _set_state(self, state): 
     320        if self._state != state: 
     321            self._state = state 
     322            for w in self._watchers: 
     323                w(self, state) 
     324 
     325    def cancel(self): 
     326        """ 
     327        Attempt to cancel the the call. Return `False` if the call is 
     328        already in progress and cannot be canceled, otherwise return `True`. 
     329 
     330        """ 
     331        with self._condition: 
     332            if self._state in [Future.Running, Future.Finished]: 
     333                return False 
     334            elif self._state == Future.Canceled: 
     335                return True 
     336            else: 
     337                self._state = Future.Canceled 
     338                self._condition.notify_all() 
     339 
     340        return True 
     341 
     342    def cancelled(self): 
     343        """ 
     344        Return `True` if call was successfully cancelled. 
     345        """ 
     346        with self._condition: 
     347            return self._state == Future.Canceled 
     348 
     349    def done(self): 
     350        """ 
     351        Return `True` if the call was successfully cancelled or finished 
     352        running. 
     353 
     354        """ 
     355        with self._condition: 
     356            return self._state in [Future.Canceled, Future.Finished] 
     357 
     358    def running(self): 
     359        """ 
     360        Return True if the call is currently being executed. 
     361        """ 
     362        with self._condition: 
     363            return self._state == Future.Running 
     364 
     365    def _get_result(self): 
     366        if self._exception: 
     367            raise self._exception 
     368        else: 
     369            return self._result 
     370 
     371    def result(self, timeout=None): 
     372        """ 
     373        Return the result of the :class:`Futures` computation. If `timeout` 
     374        is `None` the call will block until either the computation finished 
     375        or is cancelled. 
     376        """ 
     377        with self._condition: 
     378            if self._state == Future.Finished: 
     379                return self._get_result() 
     380            elif self._state == Future.Canceled: 
     381                raise CancelledError() 
     382 
     383            self._condition.wait(timeout) 
     384 
     385            if self._state == Future.Finished: 
     386                return self._get_result() 
     387            elif self._state == Future.Canceled: 
     388                raise CancelledError() 
     389            else: 
     390                raise TimeoutError() 
     391 
     392    def exception(self, timeout=None): 
     393        """ 
     394        Return the exception instance (if any) resulting from the execution 
     395        of the :class:`Future`. Can raise a :class:`CancelledError` if the 
     396        computation was cancelled. 
     397 
     398        """ 
     399        with self._condition: 
     400            if self._state == Future.Finished: 
     401                return self._exception 
     402            elif self._state == Future.Canceled: 
     403                raise CancelledError() 
     404 
     405            self._condition.wait(timeout) 
     406 
     407            if self._state == Future.Finished: 
     408                return self._exception 
     409            elif self._state == Future.Canceled: 
     410                raise CancelledError() 
     411            else: 
     412                raise TimeoutError() 
     413 
     414    def set_result(self, result): 
     415        """ 
     416        Set the result of the computation (called by the worker thread). 
     417        """ 
     418        with self._condition: 
     419            self._result = result 
     420            self._state = Future.Finished 
     421            self._condition.notify_all() 
     422 
     423    def set_exception(self, exception): 
     424        """ 
     425        Set the exception instance that was raised by the computation 
     426        (called by the worker thread). 
     427 
     428        """ 
     429        with self._condition: 
     430            self._exception = exception 
     431            self._state = Future.Finished 
     432            self._condition.notify_all() 
     433 
     434    def set_running_or_notify_cancel(self): 
     435        with self._condition: 
     436            if self._state == Future.Canceled: 
     437                return False 
     438            elif self._state == Future.Pending: 
     439                self._state = Future.Running 
     440                return True 
     441            else: 
     442                raise Exception() 
     443 
     444 
     445class StateChangedEvent(QEvent): 
     446    """ 
     447    Represents a change in the internal state of a :class:`Future`. 
     448    """ 
     449    StateChanged = QEvent.registerEventType() 
     450 
     451    def __init__(self, state): 
     452        QEvent.__init__(self, StateChangedEvent.StateChanged) 
     453        self._state = state 
     454 
     455    def state(self): 
     456        """ 
     457        Return the new state (Future.Pending, Future.Cancelled, ...). 
     458        """ 
     459        return self._state 
     460 
     461 
     462class FutureWatcher(QObject): 
     463    """ 
     464    A `FutureWatcher` class provides a convenient interface to the 
     465    :class:`Future` instance using Qt's signals. 
     466 
     467    :param :class:`Future` future: 
     468        A :class:`Future` instance to watch. 
     469    :param :class:`QObject` parent: 
     470        Object's parent instance. 
     471 
     472    """ 
     473    #: The future was cancelled. 
     474    cancelled = Signal() 
     475 
     476    #: The future has finished. 
     477    finished = Signal() 
     478 
     479    #: The future has started computation. 
     480    started = Signal() 
     481 
     482    def __init__(self, future, parent=None): 
     483        QObject.__init__(self, parent) 
     484        self._future = future 
     485 
     486        self._future._watchers.append(self._stateChanged) 
     487 
     488    def isCancelled(self): 
     489        """ 
     490        Was the future cancelled. 
     491        """ 
     492        return self._future.cancelled() 
     493 
     494    def isDone(self): 
     495        """ 
     496        Is the future done (was cancelled or has finished). 
     497        """ 
     498        return self._future.done() 
     499 
     500    def isRunning(self): 
     501        """ 
     502        Is the future running (i.e. has started). 
     503        """ 
     504        return self._future.running() 
     505 
     506    def isStarted(self): 
     507        """ 
     508        Has the future computation started. 
     509        """ 
     510        return self._future.running() 
     511 
     512    def result(self): 
     513        """ 
     514        Return the result of the computation. 
     515        """ 
     516        return self._future.result() 
     517 
     518    def exception(self): 
     519        """ 
     520        Return the exception instance or `None` if no exception was raised. 
     521        """ 
     522        return self._future.exception() 
     523 
     524    def customEvent(self, event): 
     525        """ 
     526        Reimplemented from `QObject.customEvent`. 
     527        """ 
     528        if event.type() == StateChangedEvent.StateChanged: 
     529            if event.state() == Future.Canceled: 
     530                self.cancelled.emit() 
     531            elif event.state() == Future.Running: 
     532                self.started.emit() 
     533            elif event.state() == Future.Finished: 
     534                self.finished.emit() 
     535            return 
     536 
     537        return QObject.customEvent(self, event) 
     538 
     539    def _stateChanged(self, future, state): 
     540        """ 
     541        The `future` state has changed (called by :class:`Future`). 
     542        """ 
     543        ev = StateChangedEvent(state) 
     544 
     545        if self.thread() is QThread.currentThread(): 
     546            QCoreApplication.sendEvent(self, ev) 
     547        else: 
     548            QCoreApplication.postEvent(self, ev) 
     549 
     550 
     551class methodinvoke(object): 
     552    """ 
     553    Create an QObject method wrapper that invokes the method asynchronously 
     554    in the object's own thread. 
     555 
     556    :param obj: 
     557        A QObject instance. 
     558    :param str method: 
     559        The method name. 
     560    :param tuple arg_types: 
     561        A tuple of positional argument types. 
     562 
     563    """ 
     564    def __init__(self, obj, method, arg_types=()): 
     565        self.obj = obj 
     566        self.method = method 
     567        self.arg_types = tuple(arg_types) 
     568 
     569    def __call__(self, *args): 
     570        args = [Q_ARG(atype, arg) for atype, arg in zip(self.arg_types, args)] 
     571        QMetaObject.invokeMethod( 
     572            self.obj, self.method, Qt.QueuedConnection, 
     573            *args 
     574        ) 
     575 
     576 
     577try: 
     578    import unittest2 as unittest 
     579except ImportError: 
     580    import unittest 
     581 
     582 
     583class TestFutures(unittest.TestCase): 
     584 
     585    def test_futures(self): 
     586        f = Future() 
     587        self.assertEqual(f.done(),  False) 
     588        self.assertEqual(f.running(),  False) 
     589 
     590        self.assertTrue(f.cancel()) 
     591        self.assertTrue(f.cancelled()) 
     592 
     593        with self.assertRaises(CancelledError): 
     594            f.result() 
     595 
     596        with self.assertRaises(CancelledError): 
     597            f.exception() 
     598 
     599        f = Future() 
     600        f.set_running_or_notify_cancel() 
     601 
     602        with self.assertRaises(TimeoutError): 
     603            f.result(0.1) 
     604 
     605        with self.assertRaises(TimeoutError): 
     606            f.exception(0.1) 
     607 
     608        f = Future() 
     609        f.set_running_or_notify_cancel() 
     610        f.set_result("result") 
     611 
     612        self.assertEqual(f.result(), "result") 
     613        self.assertEqual(f.exception(), None) 
     614 
     615        f = Future() 
     616        f.set_running_or_notify_cancel() 
     617 
     618        f.set_exception(Exception("foo")) 
     619 
     620        with self.assertRaises(Exception): 
     621            f.result() 
     622 
     623 
     624class TestExecutor(unittest.TestCase): 
     625 
     626    def setUp(self): 
     627        self.app = QCoreApplication([]) 
     628 
     629    def test_executor(self): 
     630        executor = ThreadExecutor() 
     631        f1 = executor.submit(pow, 100, 100) 
     632 
     633        f2 = executor.submit(lambda: 1 / 0) 
     634 
     635        f3 = executor.submit(QThread.currentThread) 
     636 
     637        self.assertTrue(f1.result(), pow(100, 100)) 
     638 
     639        with self.assertRaises(ZeroDivisionError): 
     640            f2.result() 
     641 
     642        self.assertIsInstance(f2.exception(), ZeroDivisionError) 
     643 
     644        self.assertIsNot(f3.result(), QThread.currentThread()) 
     645 
     646    def test_methodinvoke(self): 
     647        executor = ThreadExecutor() 
     648        state = [None, None] 
     649 
     650        class StateSetter(QObject): 
     651            @Slot(object) 
     652            def set_state(self, value): 
     653                state[0] = value 
     654                state[1] = QThread.currentThread() 
     655 
     656        def func(callback): 
     657            callback(QThread.currentThread()) 
     658 
     659        obj = StateSetter() 
     660        f1 = executor.submit(func, methodinvoke(obj, "set_state", (object,))) 
     661        f1.result() 
     662 
     663        # So invoked method can be called 
     664        QCoreApplication.processEvents() 
     665 
     666        self.assertIs(state[1], QThread.currentThread(), 
     667                      "set_state was called from the wrong thread") 
     668 
     669        self.assertIsNot(state[0], QThread.currentThread(), 
     670                         "set_state was invoked in the main thread") 
     671 
     672        executor.shutdown(wait=True) 
     673 
     674    def test_executor_map(self): 
     675        executor = ThreadExecutor() 
     676 
     677        r = executor.map(pow, range(1000), range(1000)) 
     678 
     679        results = list(r) 
     680 
     681        self.assertTrue(len(results) == 1000) 
     682 
     683 
     684class TestFutureWatcher(unittest.TestCase): 
     685    def setUp(self): 
     686        self.app = QCoreApplication([]) 
     687 
     688    def test_watcher(self): 
     689        executor = ThreadExecutor() 
     690        f = executor.submit(QThread.currentThread) 
     691        watcher = FutureWatcher(f) 
     692 
     693        if f.cancel(): 
     694            self.assertTrue(watcher.isCancelled()) 
     695 
     696        executor.shutdown() 
     697 
     698 
     699class TestTask(unittest.TestCase): 
     700    def setUp(self): 
     701        self.app = QCoreApplication([]) 
     702 
     703    def test_task(self): 
     704        results = [] 
     705 
     706        task = Task(function=QThread.currentThread) 
     707        task.resultReady.connect(results.append) 
     708 
     709        task.start() 
     710        self.app.processEvents() 
     711 
     712        self.assertSequenceEqual(results, [QThread.currentThread()]) 
     713 
     714        results = [] 
     715 
     716        thread = QThread() 
     717        thread.start() 
     718 
     719        task = Task(function=QThread.currentThread) 
     720 
     721        task.moveToThread(thread) 
     722 
     723        self.assertIsNot(task.thread(), QThread.currentThread()) 
     724        self.assertIs(task.thread(), thread) 
     725 
     726        task.resultReady.connect(results.append, Qt.DirectConnection) 
     727        task.start() 
     728 
     729        f = task.future() 
     730 
     731        self.assertIsNot(f.result(3), QThread.currentThread()) 
     732 
     733        self.assertIs(f.result(3), results[-1]) 
     734 
     735    def test_executor(self): 
     736        executor = ThreadExecutor() 
     737 
     738        f = executor.submit(QThread.currentThread) 
     739 
     740        self.assertIsNot(f.result(3), QThread.currentThread()) 
     741 
     742        f = executor.submit(lambda: 1 / 0) 
     743 
     744        with self.assertRaises(ZeroDivisionError): 
     745            f.result() 
     746 
     747        results = [] 
     748        task = Task(function=QThread.currentThread) 
     749        task.resultReady.connect(results.append, Qt.DirectConnection) 
     750 
     751        f = executor.submit(task) 
     752 
     753        self.assertIsNot(f.result(3), QThread.currentThread()) 
     754 
     755        executor.shutdown() 
     756 
     757 
     758############################################ 
     759# DEPRECATED (not to mention extremely ugly) 
     760############################################ 
     761 
    14762 
    15763class AsyncCall(QObject): 
    16     """ A wrapper class for async function calls using 
    17     Qt's signals for communication with GUI thread 
     764    """ 
     765    A wrapper class for async function calls using Qt's signals for 
     766    communication between threads 
    18767 
    19768    Arguments: 
     
    24773           (for this to work `thread` argument must be `None`) 
    25774        - `parent`: parent object (should be None for most cases) 
    26          
     775 
    27776    Signals: 
    28777        - `starting()` 
    29         - `finished(QString)` - emited when finished with an argument 'OK' on success or repr(ex) on error 
    30         - `finished(PyQt_PyObject, QString)` - same as above but also pass self as an argument  
    31         - `unhandledException(PyQt_PyObject)` - emited on error with `sys.exc_info()` argument 
    32         - `resultReady(PyQt_PyObject)` - emited on success with function call result as argument 
    33     """ 
    34     def __init__(self, func=None, args=(), kwargs={}, thread=None, threadPool=None, parent=None): 
     778        - `finished(QString)` - emitted when finished with an argument 'OK' 
     779          on success or repr(ex) on error. 
     780        - `finished(PyQt_PyObject, QString)` - same as above but also 
     781          pass self as an argument. 
     782        - `unhandledException(PyQt_PyObject)` - emitted on error 
     783          with `sys.exc_info()` argument. 
     784        - `resultReady(PyQt_PyObject)` - emitted on success with 
     785          function call result as argument. 
     786 
     787    """ 
     788 
     789    def __init__(self, func=None, args=(), kwargs={}, thread=None, 
     790                 threadPool=None, parent=None): 
    35791        QObject.__init__(self, parent) 
    36792        self.func = func 
     
    38794        self._kwargs = kwargs 
    39795        self.threadPool = None 
    40          
     796 
    41797        self._connected = True 
    42798        self._cancelRequested = False 
    43799        self._started = False 
    44800        self._cancelled = False 
    45          
     801 
    46802        if thread is not None: 
    47803            self.moveToThread(thread) 
     
    50806                threadPool = QThreadPool.globalInstance() 
    51807            self.threadPool = threadPool 
    52             self._runnable = RunnableAsyncCall(self) 
     808            self._runnable = _RunnableAsyncCall(self) 
    53809            self.threadPool.start(self._runnable) 
    54810            self._connected = False 
    55811            return 
    56              
    57         self.connect(self, SIGNAL("_async_start()"), self.execute, Qt.QueuedConnection) 
    58          
    59  
     812 
     813        self.connect(self, SIGNAL("_async_start()"), self.execute, 
     814                     Qt.QueuedConnection) 
    60815 
    61816    @pyqtSignature("execute()") 
     
    69824            self.emit(SIGNAL("finished(QString)"), QString("Cancelled")) 
    70825            return 
     826 
    71827        self._started = True 
    72828        self.emit(SIGNAL("starting()")) 
    73829        try: 
    74             self.result  = self.func(*self._args, **self._kwargs) 
     830            self.result = self.func(*self._args, **self._kwargs) 
    75831        except Exception, ex: 
    76             print >> sys.stderr, "Exception in thread ", QThread.currentThread(), " while calling ", self.func 
     832            print >> sys.stderr, "Exception in thread ", \ 
     833                  QThread.currentThread(), " while calling ", self.func 
    77834            self.emit(SIGNAL("finished(QString)"), QString(repr(ex))) 
    78             self.emit(SIGNAL("finished(PyQt_PyObject, QString)"), self, QString(repr(ex))) 
    79             self.emit(SIGNAL("unhandledException(PyQt_PyObject)"), sys.exc_info()) 
     835            self.emit(SIGNAL("finished(PyQt_PyObject, QString)"), 
     836                      self, QString(repr(ex))) 
     837            self.emit(SIGNAL("unhandledException(PyQt_PyObject)"), 
     838                      sys.exc_info()) 
    80839 
    81840            self._exc_info = sys.exc_info() 
     
    84843 
    85844        self.emit(SIGNAL("finished(QString)"), QString("Ok")) 
    86         self.emit(SIGNAL("finished(PyQt_PyObject, QString)"), self, QString("Ok")) 
    87         self.emit(SIGNAL("resultReady(PyQt_PyObject)"), self.result) 
     845        self.emit(SIGNAL("finished(PyQt_PyObject, QString)"), 
     846                  self, QString("Ok")) 
     847        self.emit(SIGNAL("resultReady(PyQt_PyObject)"), 
     848                  self.result) 
    88849        self._status = 0 
    89  
    90850 
    91851    def __call__(self, *args, **kwargs): 
     
    95855            self.func = partial(self.func, *self._args, **self._kwargs) 
    96856            self._args, self._kwargs = args, kwargs 
    97              
     857 
    98858        if not self._connected: 
    99             QTimer.singleShot(50, self.__call__) # delay until event loop initialized by RunnableAsyncCall 
     859            # delay until event loop initialized by _RunnableAsyncCall 
     860            QTimer.singleShot(50, self.__call__) 
    100861            return 
    101862        else: 
    102863            self.emit(SIGNAL("_async_start()")) 
    103864 
    104  
    105865    def apply_async(self, func, args, kwargs): 
    106866        """ call function with `args` as positional and `kwargs` as keyword 
     
    110870        self.__call__() 
    111871 
    112  
    113872    def poll(self): 
    114873        """ Return the state of execution. 
    115874        """ 
    116875        return getattr(self, "_status", None) 
    117      
    118      
     876 
    119877    def join(self, processEvents=True): 
    120878        """ Wait until the execution finishes. 
     
    124882            if processEvents and QThread.currentThread() is qApp.thread(): 
    125883                qApp.processEvents() 
    126                  
     884 
    127885    def get_result(self, processEvents=True): 
    128886        """ Block until the computation completes and return the call result. 
    129887        If the execution resulted in an exception, this method will re-raise 
    130         it.  
     888        it. 
    131889        """ 
    132890        self.join(processEvents=processEvents) 
    133         if self.poll() != 0:  
     891        if self.poll() != 0: 
    134892            # re-raise the error 
    135893            raise self._exc_info[0], self._exc_info[1] 
    136894        else: 
    137895            return self.result 
    138      
     896 
    139897    def emitAdvance(self, count=1): 
    140898        self.emit(SIGNAL("advance()")) 
    141899        self.emit(SIGNAL("advance(int)"), count) 
    142          
    143          
     900 
    144901    def emitProgressChanged(self, value): 
    145902        self.emit(SIGNAL("progressChanged(float)"), value) 
    146          
    147      
     903 
    148904    @pyqtSignature("moveToAndExecute(PyQt_PyObject)") 
    149905    def moveToAndExecute(self, thread): 
    150906        self.moveToThread(thread) 
    151          
    152         self.connect(self, SIGNAL("_async_start()"), self.execute, Qt.QueuedConnection) 
    153          
     907 
     908        self.connect(self, SIGNAL("_async_start()"), self.execute, 
     909                     Qt.QueuedConnection) 
     910 
    154911        self.emit(SIGNAL("_async_start()")) 
    155          
    156          
     912 
    157913    @pyqtSignature("moveToAndInit(PyQt_PyObject)") 
    158914    def moveToAndInit(self, thread): 
    159915        self.moveToThread(thread) 
    160          
    161         self.connect(self, SIGNAL("_async_start()"), self.execute, Qt.QueuedConnection) 
     916 
     917        self.connect(self, SIGNAL("_async_start()"), self.execute, 
     918                     Qt.QueuedConnection) 
    162919        self._connected = True 
    163          
     920 
    164921 
    165922class WorkerThread(QThread): 
     
    168925    def run(self): 
    169926        self.exec_() 
    170          
    171          
    172 class RunnableTask(QRunnable): 
     927 
     928 
     929class _RunnableTask(QRunnable): 
    173930    """ Wrapper for an AsyncCall 
    174931    """ 
     
    177934        self.setAutoDelete(False) 
    178935        self._call = call 
    179          
     936 
    180937    def run(self): 
    181938        if isinstance(self._call, AsyncCall): 
    182939            self.eventLoop = QEventLoop() 
    183940            self.eventLoop.processEvents() 
    184             QObject.connect(self._call, SIGNAL("finished(QString)"), lambda str: self.eventLoop.quit()) 
    185             QMetaObject.invokeMethod(self._call, "moveToAndInit", Qt.QueuedConnection, Q_ARG("PyQt_PyObject", QThread.currentThread())) 
     941            QObject.connect(self._call, SIGNAL("finished(QString)"), 
     942                            lambda str: self.eventLoop.quit()) 
     943            QMetaObject.invokeMethod(self._call, "moveToAndInit", 
     944                                     Qt.QueuedConnection, 
     945                                     Q_ARG("PyQt_PyObject", 
     946                                           QThread.currentThread())) 
    186947            self.eventLoop.processEvents() 
    187948            self.eventLoop.exec_() 
    188949        else: 
    189950            self._return = self._call() 
    190              
    191              
    192 class RunnableAsyncCall(RunnableTask): 
     951 
     952 
     953class _RunnableAsyncCall(_RunnableTask): 
    193954    def run(self): 
    194955        self.eventLoop = QEventLoop() 
    195956        self.eventLoop.processEvents() 
    196         QObject.connect(self._call, SIGNAL("finished(QString)"), lambda str: self.eventLoop.quit()) 
    197         QMetaObject.invokeMethod(self._call, "moveToAndInit", Qt.QueuedConnection, Q_ARG("PyQt_PyObject", QThread.currentThread())) 
     957        QObject.connect(self._call, SIGNAL("finished(QString)"), 
     958                        lambda str: self.eventLoop.quit()) 
     959        QMetaObject.invokeMethod(self._call, "moveToAndInit", 
     960                                 Qt.QueuedConnection, 
     961                                 Q_ARG("PyQt_PyObject", 
     962                                       QThread.currentThread())) 
    198963        self.eventLoop.processEvents() 
    199964        self.eventLoop.exec_() 
    200965 
    201 def createTask(call, args=(), kwargs={}, onResult=None, onStarted=None, onFinished=None, onError=None, thread=None, threadPool=None): 
     966 
     967def createTask(call, args=(), kwargs={}, onResult=None, onStarted=None, 
     968               onFinished=None, onError=None, thread=None, threadPool=None): 
    202969    async = AsyncCall(thread=thread, threadPool=threadPool) 
    203970    if onResult is not None: 
    204         async.connect(async, SIGNAL("resultReady(PyQt_PyObject)"), onResult, Qt.QueuedConnection) 
     971        async.connect(async, SIGNAL("resultReady(PyQt_PyObject)"), onResult, 
     972                      Qt.QueuedConnection) 
    205973    if onStarted is not None: 
    206         async.connect(async, SIGNAL("starting()"), onStarted, Qt.QueuedConnection) 
     974        async.connect(async, SIGNAL("starting()"), onStarted, 
     975                      Qt.QueuedConnection) 
    207976    if onFinished is not None: 
    208         async.connect(async, SIGNAL("finished(QString)"), onFinished, Qt.QueuedConnection) 
     977        async.connect(async, SIGNAL("finished(QString)"), onFinished, 
     978                      Qt.QueuedConnection) 
    209979    if onError is not None: 
    210         async.connect(async, SIGNAL("unhandledException(PyQt_PyObject)"), onError, Qt.QueuedConnection) 
     980        async.connect(async, SIGNAL("unhandledException(PyQt_PyObject)"), 
     981                      onError, Qt.QueuedConnection) 
    211982    async.apply_async(call, args, kwargs) 
    212983    return async 
    213          
    214 from functools import partial 
    215          
    216 class ProgressBar(QObject): 
    217     """ A thread safe progress callback using Qt's signal mechanism 
    218     to deliver progress updates to the GUI thread. Make sure this object instance 
    219     is created in the GUI thread or is a child of an object from the GUI thread 
    220     """ 
    221      
    222     def __init__(self, widget, iterations, parent=None): 
    223         QObject.__init__(self, parent) 
    224         assert (qApp.thread() is self.thread()) 
    225         self.iter = iterations 
    226         self.widget = widget 
    227         self.count = 0 
    228         self.widget.progressBarInit() 
    229          
    230     def advance(self, count=1): 
    231         self.count += count 
    232         value = int(self.count*100/self.iter) 
    233         QMetaObject.invokeMethod(self, "queuedInvoke", Qt.QueuedConnection, Q_ARG("PyQt_PyObject", partial(self.widget.progressBarSet, value))) 
    234  
    235     def finish(self): 
    236         QMetaObject.invokeMethod(self, "queuedInvoke", Qt.QueuedConnection, Q_ARG("PyQt_PyObject", self.widget.progressBarFinished)) 
    237          
    238     def progressBarSet(self, value): 
    239         QMetaObject.invokeMethod(self, "queuedInvoke", Qt.QueuedConnection, Q_ARG("PyQt_PyObject", partial(self.widget.progressBarSet, value))) 
    240      
    241     @pyqtSignature("queuedInvoke(PyQt_PyObject)") 
    242     def queuedInvoke(self, func): 
    243         func() 
    244          
    245          
     984 
     985 
    246986class synchronized(object): 
    247987    def __init__(self, object, mode=QMutex.Recursive): 
     
    249989            object._mutex = QMutex(mode) 
    250990        self.mutex = object._mutex 
    251          
     991 
    252992    def __enter__(self): 
    253993        self.mutex.lock() 
    254994        return self 
    255      
     995 
    256996    def __exit__(self, exc_type=None, exc_value=None, tb=None): 
    257997        self.mutex.unlock() 
    258  
    259 _global_thread_pools = {} 
    260          
    261          
    262 def threadPool(self, class_="global", private=False, maxCount=None): 
    263     with synchronized(threadPool): 
    264         if private: 
    265             pools = self._private_thread_pools 
    266         else: 
    267             pools = _global_thread_pools 
    268              
    269         if class_ not in pools: 
    270             if class_ == "global": 
    271                 instance = QThreadPool.globalInstance() 
    272             else: 
    273                 instance = QThreadPool() 
    274                 instance.setMaxThreadCount(maxCount) 
    275             pools[class_] = instance 
    276         return pools[class_] 
    277      
    278 OWBaseWidget.threadPool = threadPool 
    279          
    280  
    281 """\ 
    282 A multiprocessing like API 
    283 ========================== 
    284  
    285 Incomplette 
    286 """ 
    287  
    288 class Process(AsyncCall): 
    289     _process_id = 0 
    290     def __init__(group=None, target=None, name=None, args=(), kwargs={}): 
    291         self.worker = WorkerThread() 
    292         AsyncCall.__init__(self, thread=self.worker) 
    293          
    294         self.conenct(self, SIGANL("finished(QString)"), self.onFinished, Qt.QueuedConnection) 
    295         self.connect(self, SIGNAL("finished(QString)"), lambda:self.worker.quit(), Qt.QueuedConnection) 
    296         self.target = target 
    297         self.args = args 
    298         self.kwargs = kwargs 
    299         if name is None: 
    300             self.name = "Process-%i" % self._process_id 
    301             Process._process_id += 1 
    302         else: 
    303             self.name = name 
    304         self.exitcode = -1 
    305              
    306     def start(self): 
    307         self.worker.start() 
    308         self.async_call(self.run) 
    309  
    310     def run(self): 
    311         self._result = self.target(*self.args, **self.kwargs) 
    312           
    313     def join(self): 
    314         while self.poll() is None: 
    315             time.sleep(10) 
    316  
    317     def is_alive(self): 
    318         return self.poll() is None 
    319      
    320     def onFinished(self, string): 
    321         self.exitcode = self._status 
    322          
    323     def terminate(self): 
    324         self.worker.terminate() 
    325      
    326 from Queue import Queue 
    327  
    328 class Pool(QObject): 
    329     def __init__(self, processes=None): 
    330         if processes is None: 
    331             import multiprocessing 
    332             processes = multiprocessing.cpu_count() 
    333         self.processes = processes 
    334         self.pool = [Process() for i in range(processes)] 
    335         self._i = 0 
    336     def get_process(self): 
    337         self._i = (self._i + 1) % len(self.pool) 
    338         return self.pool[self._i] 
    339       
    340     def apply_async(func, args, kwargs): 
    341         process = self.get_process() 
    342         process.start() 
    343          
    344     def start(self, ): 
    345         pass 
    346      
Note: See TracChangeset for help on using the changeset viewer.