source: orange/Orange/OrangeWidgets/OWConcurrent.py @ 11878:298c32bb4324

Revision 11878:298c32bb4324, 31.7 KB checked in by Ales Erjavec <ales.erjavec@…>, 5 weeks ago (diff)

Added 'add_done_callback' method to Future class.

Line 
1"""\
2OWConcurent
3===========
4
5General helper functions and classes for Orange Canvas
6concurrent programming
7
8"""
9from __future__ import with_statement
10
11import sys
12import threading
13import atexit
14import logging
15
16from functools import partial
17from contextlib import contextmanager
18
19from PyQt4.QtGui import qApp
20
21from PyQt4.QtCore import (
22    Qt, QObject, QMetaObject, QTimer, QThreadPool, QThread, QMutex,
23    QRunnable, QEventLoop, QCoreApplication, QEvent, QString, SIGNAL,
24    Q_ARG, pyqtSignature,
25)
26
27from PyQt4.QtCore import pyqtSignal as Signal, pyqtSlot as Slot
28
29_log = logging.getLogger(__name__)
30
31
32@contextmanager
33def locked(mutex):
34    """
35    A context manager for locking an instance of a QMutex.
36    """
37    mutex.lock()
38    try:
39        yield
40    finally:
41        mutex.unlock()
42
43
44class _TaskDepotThread(QThread):
45    """
46    A special 'depot' thread used to transfer Task instance into threads
47    started by a QThreadPool.
48
49    """
50    _lock = threading.Lock()
51    _instance = None
52
53    def __new__(cls):
54        if _TaskDepotThread._instance is not None:
55            raise RuntimeError("Already exists")
56        return QThread.__new__(cls)
57
58    def __init__(self):
59        QThread.__init__(self)
60        self.start()
61        # Need to handle queued method calls from this thread.
62        self.moveToThread(self)
63        atexit.register(self._cleanup)
64
65    def _cleanup(self):
66        self.quit()
67        self.wait()
68
69    @staticmethod
70    def instance():
71        with _TaskDepotThread._lock:
72            if _TaskDepotThread._instance is None:
73                _TaskDepotThread._instance = _TaskDepotThread()
74            return _TaskDepotThread._instance
75
76    def run(self):
77        """
78        Reimplemented from `QThread.run`
79        """
80        # Start the event loop.
81        # On some old Qt4/PyQt4 installations base QThread.run does not seem
82        # to enter the loop, despite being documented to do so.
83        self.exec_()
84
85    @Slot(object, object)
86    def transfer(self, obj, thread):
87        """
88        Transfer `obj` (:class:`QObject`) instance from this thread to the
89        target `thread` (a :class:`QThread`).
90
91        """
92        assert obj.thread() is self
93        assert QThread.currentThread() is self
94        obj.moveToThread(thread)
95
96    def __del__(self):
97        self._cleanup()
98
99
100class _TaskRunnable(QRunnable):
101    """
102    A QRunnable for running a :class:`Task` by a :class:`ThreadExecuter`.
103    """
104    def __init__(self, future, task, args, kwargs):
105        QRunnable.__init__(self)
106        self.future = future
107        self.task = task
108        self.args = args
109        self.kwargs = kwargs
110        self.eventLoop = None
111
112    def run(self):
113        """
114        Reimplemented from `QRunnable.run`
115        """
116        self.eventLoop = QEventLoop()
117        self.eventLoop.processEvents()
118
119        # Move the task to the current thread so it's events, signals, slots
120        # are triggered from this thread.
121        assert self.task.thread() is _TaskDepotThread.instance()
122
123        QMetaObject.invokeMethod(
124            self.task.thread(), "transfer", Qt.BlockingQueuedConnection,
125            Q_ARG(object, self.task),
126            Q_ARG(object, QThread.currentThread())
127        )
128
129        self.eventLoop.processEvents()
130
131        # Schedule task.run from the event loop.
132        self.task.start()
133
134        # Quit the loop and exit when task finishes or is cancelled.
135        self.task.finished.connect(self.eventLoop.quit)
136        self.task.cancelled.connect(self.eventLoop.quit)
137        self.eventLoop.exec_()
138
139
140class _Runnable(QRunnable):
141    """
142    A QRunnable for running plain functions by a :class:`ThreadExecuter`.
143    """
144    def __init__(self, future, func, args, kwargs):
145        QRunnable.__init__(self)
146        self.future = future
147        self.func = func
148        self.args = args
149        self.kwargs = kwargs
150
151    def run(self):
152        """
153        Reimplemented from QRunnable.run
154        """
155        try:
156            if not self.future.set_running_or_notify_cancel():
157                # Was cancelled
158                return
159            try:
160                result = self.func(*self.args, **self.kwargs)
161            except BaseException, ex:
162                self.future.set_exception(ex)
163            else:
164                self.future.set_result(result)
165        except BaseException:
166            _log.critical("Exception in worker thread.", exc_info=True)
167
168
169class ThreadExecutor(QObject):
170    """
171    ThreadExceuter object class provides an interface for running tasks
172    in a thread pool.
173
174    :param QObject parent:
175        Executor's parent instance.
176
177    :param QThreadPool threadPool:
178        Thread pool to be used by the instance of the Executor. If `None`
179        then ``QThreadPool.globalInstance()`` will be used.
180
181    """
182    def __init__(self, parent=None, threadPool=None):
183        QObject.__init__(self, parent)
184        if threadPool is None:
185            threadPool = QThreadPool.globalInstance()
186        self._threadPool = threadPool
187        self._depot_thread = None
188        self._futures = []
189        self._shutdown = False
190        self._state_lock = threading.Lock()
191
192    def _get_depot_thread(self):
193        if self._depot_thread is None:
194            self._depot_thread = _TaskDepotThread.instance()
195        return self._depot_thread
196
197    def submit(self, func, *args, **kwargs):
198        """
199        Schedule the `func(*args, **kwargs)` to be executed and return an
200        :class:`Future` instance representing the result of the computation.
201
202        """
203        with self._state_lock:
204            if self._shutdown:
205                raise RuntimeError("Cannot schedule new futures after " +
206                                   "shutdown.")
207
208            if isinstance(func, Task):
209                f, runnable = self.__make_task_runnable(func)
210            else:
211                f = Future()
212                runnable = _Runnable(f, func, args, kwargs)
213
214            self._futures.append(f)
215            f._watchers.append(self._future_state_change)
216            self._threadPool.start(runnable)
217            return f
218
219    def submit_task(self, task):
220        with self._state_lock:
221            if self._shutdown:
222                raise RuntimeError("Cannot schedule new futures after " +
223                                   "shutdown.")
224
225            f, runnable = self.__make_task_runnable(task)
226
227            self._futures.append(f)
228            f._watchers.append(self._future_state_change)
229            self._threadPool.start(runnable)
230            return f
231
232    def __make_task_runnable(self, task):
233        if task.thread() is not QThread.currentThread():
234            raise ValueError("Can only submit Tasks from it's own " +
235                             "thread.")
236
237        if task.parent() is not None:
238            raise ValueError("Can not submit Tasks with a parent.")
239
240        task.moveToThread(self._get_depot_thread())
241
242        # Use the Task's own Future object
243        f = task.future()
244        runnable = _TaskRunnable(f, task, (), {})
245        return (f, runnable)
246
247    def map(self, func, *iterables):
248        futures = [self.submit(func, *args) for args in zip(*iterables)]
249
250        for f in futures:
251            yield f.result()
252
253    def shutdown(self, wait=True):
254        """
255        Shutdown the executor and free all resources. If `wait` is True then
256        wait until all pending futures are executed or cancelled.
257        """
258        with self._state_lock:
259            self._shutdown = True
260
261        if wait:
262            # Wait until all futures have completed.
263            for future in list(self._futures):
264                try:
265                    future.exception()
266                except (TimeoutError, CancelledError):
267                    pass
268
269    def _future_state_change(self, future, state):
270        # Remove futures when finished.
271        if state == Future.Finished:
272            self._futures.remove(future)
273
274
275class ExecuteCallEvent(QEvent):
276    """
277    Represents an function call from the event loop (used by :class:`Task`
278    to schedule the :func:`Task.run` method to be invoked)
279
280    """
281    ExecuteCall = QEvent.registerEventType()
282
283    def __init__(self):
284        QEvent.__init__(self, ExecuteCallEvent.ExecuteCall)
285
286
287class Task(QObject):
288    """
289    """
290    started = Signal()
291    finished = Signal()
292    cancelled = Signal()
293    resultReady = Signal(object)
294    exceptionReady = Signal(Exception)
295
296    def __init__(self, parent=None, function=None):
297        QObject.__init__(self, parent)
298        self.function = function
299
300        self._future = Future()
301
302    def run(self):
303        if self.function is None:
304            raise NotImplementedError
305        else:
306            return self.function()
307
308    def start(self):
309        QCoreApplication.postEvent(self, ExecuteCallEvent())
310
311    def future(self):
312        return self._future
313
314    def result(self, timeout=None):
315        return self._future.result(timeout)
316
317    def _execute(self):
318        try:
319            if not self._future.set_running_or_notify_cancel():
320                self.cancelled.emit()
321                return
322
323            self.started.emit()
324            try:
325                result = self.run()
326            except BaseException, ex:
327                self._future.set_exception(ex)
328                self.exceptionReady.emit(ex)
329            else:
330                self._future.set_result(result)
331                self.resultReady.emit(result)
332
333            self.finished.emit()
334        except BaseException:
335            _log.critical("Exception in Task", exc_info=True)
336
337    def customEvent(self, event):
338        if event.type() == ExecuteCallEvent.ExecuteCall:
339            self._execute()
340        else:
341            QObject.customEvent(self, event)
342
343
344def futures_iter(futures):
345    for f in futures:
346        yield f.result()
347
348
349class TimeoutError(Exception):
350    pass
351
352
353class CancelledError(Exception):
354    pass
355
356
357class Future(object):
358    """
359    Represents a result of an asynchronous computation.
360    """
361    Pending, Canceled, Running, Finished = 1, 2, 4, 8
362
363    def __init__(self):
364        self._watchers = []
365        self._state = Future.Pending
366        self._condition = threading.Condition()
367        self._result = None
368        self._exception = None
369        self._done_callbacks = []
370
371    def _set_state(self, state):
372        if self._state != state:
373            self._state = state
374            for watcher in self._watchers:
375                watcher(self, state)
376
377    def cancel(self):
378        """
379        Attempt to cancel the the call. Return `False` if the call is
380        already in progress and cannot be canceled, otherwise return `True`.
381
382        """
383        with self._condition:
384            if self._state in [Future.Running, Future.Finished]:
385                return False
386            elif self._state == Future.Canceled:
387                return True
388            else:
389                self._set_state(Future.Canceled)
390                self._condition.notify_all()
391
392        self._invoke_callbacks()
393        return True
394
395    def cancelled(self):
396        """
397        Return `True` if call was successfully cancelled.
398        """
399        with self._condition:
400            return self._state == Future.Canceled
401
402    def done(self):
403        """
404        Return `True` if the call was successfully cancelled or finished
405        running.
406
407        """
408        with self._condition:
409            return self._state in [Future.Canceled, Future.Finished]
410
411    def running(self):
412        """
413        Return True if the call is currently being executed.
414        """
415        with self._condition:
416            return self._state == Future.Running
417
418    def _get_result(self):
419        if self._exception:
420            raise self._exception
421        else:
422            return self._result
423
424    def result(self, timeout=None):
425        """
426        Return the result of the :class:`Futures` computation. If `timeout`
427        is `None` the call will block until either the computation finished
428        or is cancelled.
429        """
430        with self._condition:
431            if self._state == Future.Finished:
432                return self._get_result()
433            elif self._state == Future.Canceled:
434                raise CancelledError()
435
436            self._condition.wait(timeout)
437
438            if self._state == Future.Finished:
439                return self._get_result()
440            elif self._state == Future.Canceled:
441                raise CancelledError()
442            else:
443                raise TimeoutError()
444
445    def exception(self, timeout=None):
446        """
447        Return the exception instance (if any) resulting from the execution
448        of the :class:`Future`. Can raise a :class:`CancelledError` if the
449        computation was cancelled.
450
451        """
452        with self._condition:
453            if self._state == Future.Finished:
454                return self._exception
455            elif self._state == Future.Canceled:
456                raise CancelledError()
457
458            self._condition.wait(timeout)
459
460            if self._state == Future.Finished:
461                return self._exception
462            elif self._state == Future.Canceled:
463                raise CancelledError()
464            else:
465                raise TimeoutError()
466
467    def set_result(self, result):
468        """
469        Set the result of the computation (called by the worker thread).
470        """
471        with self._condition:
472            self._result = result
473            self._set_state(Future.Finished)
474            self._condition.notify_all()
475
476        self._invoke_callbacks()
477
478    def set_exception(self, exception):
479        """
480        Set the exception instance that was raised by the computation
481        (called by the worker thread).
482
483        """
484        with self._condition:
485            self._exception = exception
486            self._set_state(Future.Finished)
487            self._condition.notify_all()
488
489        self._invoke_callbacks()
490
491    def add_done_callback(self, fn):
492        with self._condition:
493            if self._state not in [Future.Finished, Future.Canceled]:
494                self._done_callbacks.append(fn)
495                return
496        # Already done
497        fn(self)
498
499    def set_running_or_notify_cancel(self):
500        with self._condition:
501            if self._state == Future.Canceled:
502                return False
503            elif self._state == Future.Pending:
504                self._set_state(Future.Running)
505                return True
506            else:
507                raise Exception()
508
509    def _invoke_callbacks(self):
510        for callback in self._done_callbacks:
511            try:
512                callback(self)
513            except Exception:
514                pass
515
516
517class StateChangedEvent(QEvent):
518    """
519    Represents a change in the internal state of a :class:`Future`.
520    """
521    StateChanged = QEvent.registerEventType()
522
523    def __init__(self, state):
524        QEvent.__init__(self, StateChangedEvent.StateChanged)
525        self._state = state
526
527    def state(self):
528        """
529        Return the new state (Future.Pending, Future.Cancelled, ...).
530        """
531        return self._state
532
533
534class FutureWatcher(QObject):
535    """
536    A `FutureWatcher` class provides a convenient interface to the
537    :class:`Future` instance using Qt's signals.
538
539    :param :class:`Future` future:
540        A :class:`Future` instance to watch.
541    :param :class:`QObject` parent:
542        Object's parent instance.
543
544    """
545    #: The future was cancelled.
546    cancelled = Signal()
547
548    #: The future has finished.
549    finished = Signal()
550
551    #: The future has started computation.
552    started = Signal()
553
554    def __init__(self, future, parent=None):
555        QObject.__init__(self, parent)
556        self._future = future
557
558        self._future._watchers.append(self._stateChanged)
559
560    def isCancelled(self):
561        """
562        Was the future cancelled.
563        """
564        return self._future.cancelled()
565
566    def isDone(self):
567        """
568        Is the future done (was cancelled or has finished).
569        """
570        return self._future.done()
571
572    def isRunning(self):
573        """
574        Is the future running (i.e. has started).
575        """
576        return self._future.running()
577
578    def isStarted(self):
579        """
580        Has the future computation started.
581        """
582        return self._future.running()
583
584    def result(self):
585        """
586        Return the result of the computation.
587        """
588        return self._future.result()
589
590    def exception(self):
591        """
592        Return the exception instance or `None` if no exception was raised.
593        """
594        return self._future.exception()
595
596    def customEvent(self, event):
597        """
598        Reimplemented from `QObject.customEvent`.
599        """
600        if event.type() == StateChangedEvent.StateChanged:
601            if event.state() == Future.Canceled:
602                self.cancelled.emit()
603            elif event.state() == Future.Running:
604                self.started.emit()
605            elif event.state() == Future.Finished:
606                self.finished.emit()
607            return
608
609        return QObject.customEvent(self, event)
610
611    def _stateChanged(self, future, state):
612        """
613        The `future` state has changed (called by :class:`Future`).
614        """
615        ev = StateChangedEvent(state)
616
617        if self.thread() is QThread.currentThread():
618            QCoreApplication.sendEvent(self, ev)
619        else:
620            QCoreApplication.postEvent(self, ev)
621
622
623class methodinvoke(object):
624    """
625    Create an QObject method wrapper that invokes the method asynchronously
626    in the object's own thread.
627
628    :param obj:
629        A QObject instance.
630    :param str method:
631        The method name.
632    :param tuple arg_types:
633        A tuple of positional argument types.
634
635    """
636    def __init__(self, obj, method, arg_types=()):
637        self.obj = obj
638        self.method = method
639        self.arg_types = tuple(arg_types)
640
641    def __call__(self, *args):
642        args = [Q_ARG(atype, arg) for atype, arg in zip(self.arg_types, args)]
643        QMetaObject.invokeMethod(
644            self.obj, self.method, Qt.QueuedConnection,
645            *args
646        )
647
648
649try:
650    import unittest2 as unittest
651except ImportError:
652    import unittest
653
654
655class TestFutures(unittest.TestCase):
656
657    def test_futures(self):
658        f = Future()
659        self.assertEqual(f.done(),  False)
660        self.assertEqual(f.running(),  False)
661
662        self.assertTrue(f.cancel())
663        self.assertTrue(f.cancelled())
664
665        with self.assertRaises(CancelledError):
666            f.result()
667
668        with self.assertRaises(CancelledError):
669            f.exception()
670
671        f = Future()
672        f.set_running_or_notify_cancel()
673
674        with self.assertRaises(TimeoutError):
675            f.result(0.1)
676
677        with self.assertRaises(TimeoutError):
678            f.exception(0.1)
679
680        f = Future()
681        f.set_running_or_notify_cancel()
682        f.set_result("result")
683
684        self.assertEqual(f.result(), "result")
685        self.assertEqual(f.exception(), None)
686
687        f = Future()
688        f.set_running_or_notify_cancel()
689
690        f.set_exception(Exception("foo"))
691
692        with self.assertRaises(Exception):
693            f.result()
694
695        class Ref():
696            def __init__(self, ref):
697                self.ref = ref
698
699            def set(self, ref):
700                self.ref = ref
701
702        # Test that done callbacks are called.
703        called = Ref(False)
704        f = Future()
705        f.add_done_callback(lambda f: called.set(True))
706        f.set_result(None)
707        self.assertTrue(called.ref)
708
709        # Test that callbacks are called when cancelled.
710        called = Ref(False)
711        f = Future()
712        f.add_done_callback(lambda f: called.set(True))
713        f.cancel()
714        self.assertTrue(called.ref)
715
716        # Test that callbacks are called immediately when the future is
717        # already done.
718        called = Ref(False)
719        f = Future()
720        f.set_result(None)
721        f.add_done_callback(lambda f: called.set(True))
722        self.assertTrue(called.ref)
723
724        count = Ref(0)
725        f = Future()
726        f.add_done_callback(lambda f: count.set(count.ref + 1))
727        f.add_done_callback(lambda f: count.set(count.ref + 1))
728        f.set_result(None)
729        self.assertEqual(count.ref, 2)
730
731        # Test that the callbacks are called with the future as argument.
732        done_future = Ref(None)
733        f = Future()
734        f.add_done_callback(lambda f: done_future.set(f))
735        f.set_result(None)
736        self.assertIs(f, done_future.ref)
737
738
739class TestExecutor(unittest.TestCase):
740
741    def setUp(self):
742        self.app = QCoreApplication([])
743
744    def test_executor(self):
745        executor = ThreadExecutor()
746        f1 = executor.submit(pow, 100, 100)
747
748        f2 = executor.submit(lambda: 1 / 0)
749
750        f3 = executor.submit(QThread.currentThread)
751
752        self.assertTrue(f1.result(), pow(100, 100))
753
754        with self.assertRaises(ZeroDivisionError):
755            f2.result()
756
757        self.assertIsInstance(f2.exception(), ZeroDivisionError)
758
759        self.assertIsNot(f3.result(), QThread.currentThread())
760
761    def test_methodinvoke(self):
762        executor = ThreadExecutor()
763        state = [None, None]
764
765        class StateSetter(QObject):
766            @Slot(object)
767            def set_state(self, value):
768                state[0] = value
769                state[1] = QThread.currentThread()
770
771        def func(callback):
772            callback(QThread.currentThread())
773
774        obj = StateSetter()
775        f1 = executor.submit(func, methodinvoke(obj, "set_state", (object,)))
776        f1.result()
777
778        # So invoked method can be called
779        QCoreApplication.processEvents()
780
781        self.assertIs(state[1], QThread.currentThread(),
782                      "set_state was called from the wrong thread")
783
784        self.assertIsNot(state[0], QThread.currentThread(),
785                         "set_state was invoked in the main thread")
786
787        executor.shutdown(wait=True)
788
789    def test_executor_map(self):
790        executor = ThreadExecutor()
791
792        r = executor.map(pow, range(1000), range(1000))
793
794        results = list(r)
795
796        self.assertTrue(len(results) == 1000)
797
798
799class TestFutureWatcher(unittest.TestCase):
800    def setUp(self):
801        self.app = QCoreApplication([])
802
803    def test_watcher(self):
804        executor = ThreadExecutor()
805        f = executor.submit(QThread.currentThread)
806        watcher = FutureWatcher(f)
807
808        if f.cancel():
809            self.assertTrue(watcher.isCancelled())
810
811        executor.shutdown()
812
813
814class TestTask(unittest.TestCase):
815    def setUp(self):
816        self.app = QCoreApplication([])
817
818    def test_task(self):
819        results = []
820
821        task = Task(function=QThread.currentThread)
822        task.resultReady.connect(results.append)
823
824        task.start()
825        self.app.processEvents()
826
827        self.assertSequenceEqual(results, [QThread.currentThread()])
828
829        results = []
830
831        thread = QThread()
832        thread.start()
833
834        task = Task(function=QThread.currentThread)
835
836        task.moveToThread(thread)
837
838        self.assertIsNot(task.thread(), QThread.currentThread())
839        self.assertIs(task.thread(), thread)
840
841        task.resultReady.connect(results.append, Qt.DirectConnection)
842        task.start()
843
844        f = task.future()
845
846        self.assertIsNot(f.result(3), QThread.currentThread())
847
848        self.assertIs(f.result(3), results[-1])
849
850    def test_executor(self):
851        executor = ThreadExecutor()
852
853        f = executor.submit(QThread.currentThread)
854
855        self.assertIsNot(f.result(3), QThread.currentThread())
856
857        f = executor.submit(lambda: 1 / 0)
858
859        with self.assertRaises(ZeroDivisionError):
860            f.result()
861
862        results = []
863        task = Task(function=QThread.currentThread)
864        task.resultReady.connect(results.append, Qt.DirectConnection)
865
866        f = executor.submit(task)
867
868        self.assertIsNot(f.result(3), QThread.currentThread())
869
870        executor.shutdown()
871
872
873############################################
874# DEPRECATED (not to mention extremely ugly)
875############################################
876
877
878class AsyncCall(QObject):
879    """
880    A wrapper class for async function calls using Qt's signals for
881    communication between threads
882
883    Arguments:
884        - `func`: a function to execute
885        - `thread`: a QThread instance to execute under (default `None`,
886           threadPool is used instead)
887        - `threadPool`: a QThreadPool instance to handle thread allocation
888           (for this to work `thread` argument must be `None`)
889        - `parent`: parent object (should be None for most cases)
890
891    Signals:
892        - `starting()`
893        - `finished(QString)` - emitted when finished with an argument 'OK'
894          on success or repr(ex) on error.
895        - `finished(PyQt_PyObject, QString)` - same as above but also
896          pass self as an argument.
897        - `unhandledException(PyQt_PyObject)` - emitted on error
898          with `sys.exc_info()` argument.
899        - `resultReady(PyQt_PyObject)` - emitted on success with
900          function call result as argument.
901
902    """
903
904    def __init__(self, func=None, args=(), kwargs={}, thread=None,
905                 threadPool=None, parent=None):
906        QObject.__init__(self, parent)
907        self.func = func
908        self._args = args
909        self._kwargs = kwargs
910        self.threadPool = None
911
912        self._connected = True
913        self._cancelRequested = False
914        self._started = False
915        self._cancelled = False
916
917        if thread is not None:
918            self.moveToThread(thread)
919        else:
920            if threadPool is None:
921                threadPool = QThreadPool.globalInstance()
922            self.threadPool = threadPool
923            self._runnable = _RunnableAsyncCall(self)
924            self.threadPool.start(self._runnable)
925            self._connected = False
926            return
927
928        self.connect(self, SIGNAL("_async_start()"), self.execute,
929                     Qt.QueuedConnection)
930
931    @pyqtSignature("execute()")
932    def execute(self):
933        """ Never call directly, use `__call__` or `apply_async` instead
934        """
935        assert(self.thread() is QThread.currentThread())
936        if self._cancelRequested:
937            self._cancelled = True
938            self._status = 2
939            self.emit(SIGNAL("finished(QString)"), QString("Cancelled"))
940            return
941
942        self._started = True
943        self.emit(SIGNAL("starting()"))
944        try:
945            self.result = self.func(*self._args, **self._kwargs)
946        except Exception, ex:
947            print >> sys.stderr, "Exception in thread ", \
948                  QThread.currentThread(), " while calling ", self.func
949            self.emit(SIGNAL("finished(QString)"), QString(repr(ex)))
950            self.emit(SIGNAL("finished(PyQt_PyObject, QString)"),
951                      self, QString(repr(ex)))
952            self.emit(SIGNAL("unhandledException(PyQt_PyObject)"),
953                      sys.exc_info())
954
955            self._exc_info = sys.exc_info()
956            self._status = 1
957            return
958
959        self.emit(SIGNAL("finished(QString)"), QString("Ok"))
960        self.emit(SIGNAL("finished(PyQt_PyObject, QString)"),
961                  self, QString("Ok"))
962        self.emit(SIGNAL("resultReady(PyQt_PyObject)"),
963                  self.result)
964        self._status = 0
965
966    def __call__(self, *args, **kwargs):
967        """ Apply the call with args and kwargs additional arguments
968        """
969        if args or kwargs:
970            self.func = partial(self.func, *self._args, **self._kwargs)
971            self._args, self._kwargs = args, kwargs
972
973        if not self._connected:
974            # delay until event loop initialized by _RunnableAsyncCall
975            QTimer.singleShot(50, self.__call__)
976            return
977        else:
978            self.emit(SIGNAL("_async_start()"))
979
980    def apply_async(self, func, args, kwargs):
981        """ call function with `args` as positional and `kwargs` as keyword
982        arguments (Overrides __init__ arguments).
983        """
984        self.func, self._args, self._kwargs = func, args, kwargs
985        self.__call__()
986
987    def poll(self):
988        """ Return the state of execution.
989        """
990        return getattr(self, "_status", None)
991
992    def join(self, processEvents=True):
993        """ Wait until the execution finishes.
994        """
995        while self.poll() is None:
996            QThread.currentThread().msleep(50)
997            if processEvents and QThread.currentThread() is qApp.thread():
998                qApp.processEvents()
999
1000    def get_result(self, processEvents=True):
1001        """ Block until the computation completes and return the call result.
1002        If the execution resulted in an exception, this method will re-raise
1003        it.
1004        """
1005        self.join(processEvents=processEvents)
1006        if self.poll() != 0:
1007            # re-raise the error
1008            raise self._exc_info[0], self._exc_info[1]
1009        else:
1010            return self.result
1011
1012    def emitAdvance(self, count=1):
1013        self.emit(SIGNAL("advance()"))
1014        self.emit(SIGNAL("advance(int)"), count)
1015
1016    def emitProgressChanged(self, value):
1017        self.emit(SIGNAL("progressChanged(float)"), value)
1018
1019    @pyqtSignature("moveToAndExecute(PyQt_PyObject)")
1020    def moveToAndExecute(self, thread):
1021        self.moveToThread(thread)
1022
1023        self.connect(self, SIGNAL("_async_start()"), self.execute,
1024                     Qt.QueuedConnection)
1025
1026        self.emit(SIGNAL("_async_start()"))
1027
1028    @pyqtSignature("moveToAndInit(PyQt_PyObject)")
1029    def moveToAndInit(self, thread):
1030        self.moveToThread(thread)
1031
1032        self.connect(self, SIGNAL("_async_start()"), self.execute,
1033                     Qt.QueuedConnection)
1034        self._connected = True
1035
1036
1037class WorkerThread(QThread):
1038    """ A worker thread
1039    """
1040    def run(self):
1041        self.exec_()
1042
1043
1044class _RunnableTask(QRunnable):
1045    """ Wrapper for an AsyncCall
1046    """
1047    def __init__(self, call):
1048        QRunnable.__init__(self)
1049        self.setAutoDelete(False)
1050        self._call = call
1051
1052    def run(self):
1053        if isinstance(self._call, AsyncCall):
1054            self.eventLoop = QEventLoop()
1055            self.eventLoop.processEvents()
1056            QObject.connect(self._call, SIGNAL("finished(QString)"),
1057                            lambda str: self.eventLoop.quit())
1058            QMetaObject.invokeMethod(self._call, "moveToAndInit",
1059                                     Qt.QueuedConnection,
1060                                     Q_ARG("PyQt_PyObject",
1061                                           QThread.currentThread()))
1062            self.eventLoop.processEvents()
1063            self.eventLoop.exec_()
1064        else:
1065            self._return = self._call()
1066
1067
1068class _RunnableAsyncCall(_RunnableTask):
1069    def run(self):
1070        self.eventLoop = QEventLoop()
1071        self.eventLoop.processEvents()
1072        QObject.connect(self._call, SIGNAL("finished(QString)"),
1073                        lambda str: self.eventLoop.quit())
1074        QMetaObject.invokeMethod(self._call, "moveToAndInit",
1075                                 Qt.QueuedConnection,
1076                                 Q_ARG("PyQt_PyObject",
1077                                       QThread.currentThread()))
1078        self.eventLoop.processEvents()
1079        self.eventLoop.exec_()
1080
1081
1082def createTask(call, args=(), kwargs={}, onResult=None, onStarted=None,
1083               onFinished=None, onError=None, thread=None, threadPool=None):
1084    async = AsyncCall(thread=thread, threadPool=threadPool)
1085    if onResult is not None:
1086        async.connect(async, SIGNAL("resultReady(PyQt_PyObject)"), onResult,
1087                      Qt.QueuedConnection)
1088    if onStarted is not None:
1089        async.connect(async, SIGNAL("starting()"), onStarted,
1090                      Qt.QueuedConnection)
1091    if onFinished is not None:
1092        async.connect(async, SIGNAL("finished(QString)"), onFinished,
1093                      Qt.QueuedConnection)
1094    if onError is not None:
1095        async.connect(async, SIGNAL("unhandledException(PyQt_PyObject)"),
1096                      onError, Qt.QueuedConnection)
1097    async.apply_async(call, args, kwargs)
1098    return async
1099
1100
1101class synchronized(object):
1102    def __init__(self, object, mode=QMutex.Recursive):
1103        if not hasattr(object, "_mutex"):
1104            object._mutex = QMutex(mode)
1105        self.mutex = object._mutex
1106
1107    def __enter__(self):
1108        self.mutex.lock()
1109        return self
1110
1111    def __exit__(self, exc_type=None, exc_value=None, tb=None):
1112        self.mutex.unlock()
Note: See TracBrowser for help on using the repository browser.