source: orange/Orange/OrangeWidgets/OWConcurrent.py @ 11779:156bb31cf132

Revision 11779:156bb31cf132, 29.3 KB checked in by Ales Erjavec <ales.erjavec@…>, 4 months ago (diff)

ThreadExecutor should only wait for futures submited through it.

This is because different executors can share the same thread pool.

Line 
1"""\
2OWConcurent
3===========
4
5General helper functions and classes for Orange Canvas
6concurrent programming
7
8"""
9from __future__ import with_statement
10
11import sys
12import threading
13import logging
14
15from functools import partial
16from contextlib import contextmanager
17
18from PyQt4.QtGui import qApp
19
20from PyQt4.QtCore import (
21    Qt, QObject, QMetaObject, QTimer, QThreadPool, QThread, QMutex,
22    QRunnable, QEventLoop, QCoreApplication, QEvent, QString, SIGNAL,
23    Q_ARG, pyqtSignature,
24)
25
26from PyQt4.QtCore import pyqtSignal as Signal, pyqtSlot as Slot
27
28_log = logging.getLogger(__name__)
29
30
31@contextmanager
32def locked(mutex):
33    """
34    A context manager for locking an instance of a QMutex.
35    """
36    mutex.lock()
37    try:
38        yield
39    finally:
40        mutex.unlock()
41
42
43class _TaskDepotThread(QThread):
44    """
45    A special 'depot' thread used to transfer Task instance into threads
46    started by a QThreadPool.
47
48    """
49    def start(self):
50        """
51        Reimplemented from `QThread.start`
52        """
53        QThread.start(self)
54        # Need to also handle method invoke from this thread
55        self.moveToThread(self)
56
57    def run(self):
58        """
59        Reimplemented from `QThread.run`
60        """
61        # Start the event loop.
62        # On some old Qt4/PyQt4 installations base QThread.run does not seem
63        # to enter the loop, despite being documented to do so.
64        self.exec_()
65
66    @Slot(object, object)
67    def transfer(self, obj, thread):
68        """
69        Transfer `obj` (:class:`QObject`) instance from this thread to the
70        target `thread` (a :class:`QThread`).
71
72        """
73        assert obj.thread() is self
74        assert QThread.currentThread() is self
75        obj.moveToThread(thread)
76
77
78class _TaskRunnable(QRunnable):
79    """
80    A QRunnable for running a :class:`Task` by a :class:`ThreadExecuter`.
81    """
82    def __init__(self, future, task, args, kwargs):
83        QRunnable.__init__(self)
84        self.future = future
85        self.task = task
86        self.args = args
87        self.kwargs = kwargs
88        self.eventLoop = None
89
90    def run(self):
91        """
92        Reimplemented from `QRunnable.run`
93        """
94        self.eventLoop = QEventLoop()
95        self.eventLoop.processEvents()
96
97        # Move the task to the current thread so it's events, signals, slots
98        # are triggered from this thread.
99        assert isinstance(self.task.thread(), _TaskDepotThread)
100        QMetaObject.invokeMethod(
101            self.task.thread(), "transfer", Qt.BlockingQueuedConnection,
102            Q_ARG(object, self.task),
103            Q_ARG(object, QThread.currentThread())
104        )
105
106        self.eventLoop.processEvents()
107
108        # Schedule task.run from the event loop.
109        self.task.start()
110
111        # Quit the loop and exit when task finishes or is cancelled.
112        # TODO: If the task encounters an critical error it might not emit
113        # these signals and this Runnable will never complete.
114        self.task.finished.connect(self.eventLoop.quit)
115        self.task.cancelled.connect(self.eventLoop.quit)
116        self.eventLoop.exec_()
117
118
119class _Runnable(QRunnable):
120    """
121    A QRunnable for running plain functions by a :class:`ThreadExecuter`.
122    """
123    def __init__(self, future, func, args, kwargs):
124        QRunnable.__init__(self)
125        self.future = future
126        self.func = func
127        self.args = args
128        self.kwargs = kwargs
129
130    def run(self):
131        """
132        Reimplemented from QRunnable.run
133        """
134        try:
135            if not self.future.set_running_or_notify_cancel():
136                # Was cancelled
137                return
138            try:
139                result = self.func(*self.args, **self.kwargs)
140            except BaseException, ex:
141                self.future.set_exception(ex)
142            else:
143                self.future.set_result(result)
144        except BaseException:
145            _log.critical("Exception in worker thread.", exc_info=True)
146
147
148class ThreadExecutor(QObject):
149    """
150    ThreadExceuter object class provides an interface for running tasks
151    in a thread pool.
152
153    :param QObject parent:
154        Executor's parent instance.
155
156    :param QThreadPool threadPool:
157        Thread pool to be used by the instance of the Executor. If `None`
158        then ``QThreadPool.globalInstance()`` will be used.
159
160    """
161    def __init__(self, parent=None, threadPool=None):
162        QObject.__init__(self, parent)
163        if threadPool is None:
164            threadPool = QThreadPool.globalInstance()
165        self._threadPool = threadPool
166        self._depot_thread = None
167        self._futures = []
168        self._shutdown = False
169        self._state_lock = threading.Lock()
170
171    def _get_depot_thread(self):
172        if self._depot_thread is None:
173            self._depot_thread = _TaskDepotThread()
174            self._depot_thread.start()
175
176        return self._depot_thread
177
178    def submit(self, func, *args, **kwargs):
179        """
180        Schedule the `func(*args, **kwargs)` to be executed and return an
181        :class:`Future` instance representing the result of the computation.
182
183        """
184        with self._state_lock:
185            if self._shutdown:
186                raise RuntimeError("Cannot schedule new futures after " +
187                                   "shutdown.")
188
189            if isinstance(func, Task):
190                if func.thread() is not QThread.currentThread():
191                    raise ValueError("Can only submit Tasks from it's own " +
192                                     "thread.")
193
194                if func.parent() is not None:
195                    raise ValueError("Can not submit Tasks with a parent.")
196
197                func.moveToThread(self._get_depot_thread())
198                assert func.thread() is self._get_depot_thread()
199                # Use the Task's own Future object
200                f = func.future()
201                runnable = _TaskRunnable(f, func, args, kwargs)
202            else:
203                f = Future()
204                runnable = _Runnable(f, func, args, kwargs)
205
206            self._futures.append(f)
207            f._watchers.append(self._future_state_change)
208            self._threadPool.start(runnable)
209            return f
210
211    def map(self, func, *iterables):
212        futures = [self.submit(func, *args) for args in zip(*iterables)]
213
214        for f in futures:
215            yield f.result()
216
217    def shutdown(self, wait=True):
218        """
219        Shutdown the executor and free all resources. If `wait` is True then
220        wait until all pending futures are executed or cancelled.
221        """
222        with self._state_lock:
223            self._shutdown = True
224
225        if self._depot_thread is not None:
226            QMetaObject.invokeMethod(
227                self._depot_thread, "quit", Qt.AutoConnection)
228
229        if wait:
230            # Wait until all futures have completed.
231            for future in list(self._futures):
232                try:
233                    future.exception()
234                except (TimeoutError, CancelledError):
235                    pass
236
237            if self._depot_thread is not None:
238                self._depot_thread.wait()
239                self._depot_thread = None
240
241    def _future_state_change(self, future, state):
242        # Remove futures when finished.
243        if state == Future.Finished:
244            self._futures.remove(future)
245
246
247class ExecuteCallEvent(QEvent):
248    """
249    Represents an function call from the event loop (used by :class:`Task`
250    to schedule the :func:`Task.run` method to be invoked)
251
252    """
253    ExecuteCall = QEvent.registerEventType()
254
255    def __init__(self):
256        QEvent.__init__(self, ExecuteCallEvent.ExecuteCall)
257
258
259class Task(QObject):
260    """
261    """
262    started = Signal()
263    finished = Signal()
264    cancelled = Signal()
265    resultReady = Signal(object)
266    exceptionReady = Signal(Exception)
267
268    def __init__(self, parent=None, function=None):
269        QObject.__init__(self, parent)
270        self.function = function
271
272        self._future = Future()
273
274    def run(self):
275        if self.function is None:
276            raise NotImplementedError
277        else:
278            return self.function()
279
280    def start(self):
281        QCoreApplication.postEvent(self, ExecuteCallEvent())
282
283    def future(self):
284        return self._future
285
286    def result(self, timeout=None):
287        return self._future.result(timeout)
288
289    def _execute(self):
290        try:
291            if not self._future.set_running_or_notify_cancel():
292                self.cancelled.emit()
293                return
294
295            self.started.emit()
296            try:
297                result = self.run()
298            except BaseException, ex:
299                self._future.set_exception(ex)
300                self.exceptionReady.emit(ex)
301            else:
302                self._future.set_result(result)
303                self.resultReady.emit(result)
304
305            self.finished.emit()
306        except BaseException:
307            _log.critical("Exception in Task", exc_info=True)
308
309    def customEvent(self, event):
310        if event.type() == ExecuteCallEvent.ExecuteCall:
311            self._execute()
312        else:
313            QObject.customEvent(self, event)
314
315
316def futures_iter(futures):
317    for f in futures:
318        yield f.result()
319
320
321class TimeoutError(Exception):
322    pass
323
324
325class CancelledError(Exception):
326    pass
327
328
329class Future(object):
330    """
331    A :class:`Future` class represents a result of an asynchronous
332    computation.
333
334    """
335    Pending, Canceled, Running, Finished = 1, 2, 4, 8
336
337    def __init__(self):
338        self._watchers = []
339        self._state = Future.Pending
340        self._condition = threading.Condition()
341        self._result = None
342        self._exception = None
343
344    def _set_state(self, state):
345        if self._state != state:
346            self._state = state
347            for watcher in self._watchers:
348                watcher(self, state)
349
350    def cancel(self):
351        """
352        Attempt to cancel the the call. Return `False` if the call is
353        already in progress and cannot be canceled, otherwise return `True`.
354
355        """
356        with self._condition:
357            if self._state in [Future.Running, Future.Finished]:
358                return False
359            elif self._state == Future.Canceled:
360                return True
361            else:
362                self._set_state(Future.Canceled)
363                self._condition.notify_all()
364
365        return True
366
367    def cancelled(self):
368        """
369        Return `True` if call was successfully cancelled.
370        """
371        with self._condition:
372            return self._state == Future.Canceled
373
374    def done(self):
375        """
376        Return `True` if the call was successfully cancelled or finished
377        running.
378
379        """
380        with self._condition:
381            return self._state in [Future.Canceled, Future.Finished]
382
383    def running(self):
384        """
385        Return True if the call is currently being executed.
386        """
387        with self._condition:
388            return self._state == Future.Running
389
390    def _get_result(self):
391        if self._exception:
392            raise self._exception
393        else:
394            return self._result
395
396    def result(self, timeout=None):
397        """
398        Return the result of the :class:`Futures` computation. If `timeout`
399        is `None` the call will block until either the computation finished
400        or is cancelled.
401        """
402        with self._condition:
403            if self._state == Future.Finished:
404                return self._get_result()
405            elif self._state == Future.Canceled:
406                raise CancelledError()
407
408            self._condition.wait(timeout)
409
410            if self._state == Future.Finished:
411                return self._get_result()
412            elif self._state == Future.Canceled:
413                raise CancelledError()
414            else:
415                raise TimeoutError()
416
417    def exception(self, timeout=None):
418        """
419        Return the exception instance (if any) resulting from the execution
420        of the :class:`Future`. Can raise a :class:`CancelledError` if the
421        computation was cancelled.
422
423        """
424        with self._condition:
425            if self._state == Future.Finished:
426                return self._exception
427            elif self._state == Future.Canceled:
428                raise CancelledError()
429
430            self._condition.wait(timeout)
431
432            if self._state == Future.Finished:
433                return self._exception
434            elif self._state == Future.Canceled:
435                raise CancelledError()
436            else:
437                raise TimeoutError()
438
439    def set_result(self, result):
440        """
441        Set the result of the computation (called by the worker thread).
442        """
443        with self._condition:
444            self._result = result
445            self._set_state(Future.Finished)
446            self._condition.notify_all()
447
448    def set_exception(self, exception):
449        """
450        Set the exception instance that was raised by the computation
451        (called by the worker thread).
452
453        """
454        with self._condition:
455            self._exception = exception
456            self._set_state(Future.Finished)
457            self._condition.notify_all()
458
459    def set_running_or_notify_cancel(self):
460        with self._condition:
461            if self._state == Future.Canceled:
462                return False
463            elif self._state == Future.Pending:
464                self._set_state(Future.Running)
465                return True
466            else:
467                raise Exception()
468
469
470class StateChangedEvent(QEvent):
471    """
472    Represents a change in the internal state of a :class:`Future`.
473    """
474    StateChanged = QEvent.registerEventType()
475
476    def __init__(self, state):
477        QEvent.__init__(self, StateChangedEvent.StateChanged)
478        self._state = state
479
480    def state(self):
481        """
482        Return the new state (Future.Pending, Future.Cancelled, ...).
483        """
484        return self._state
485
486
487class FutureWatcher(QObject):
488    """
489    A `FutureWatcher` class provides a convenient interface to the
490    :class:`Future` instance using Qt's signals.
491
492    :param :class:`Future` future:
493        A :class:`Future` instance to watch.
494    :param :class:`QObject` parent:
495        Object's parent instance.
496
497    """
498    #: The future was cancelled.
499    cancelled = Signal()
500
501    #: The future has finished.
502    finished = Signal()
503
504    #: The future has started computation.
505    started = Signal()
506
507    def __init__(self, future, parent=None):
508        QObject.__init__(self, parent)
509        self._future = future
510
511        self._future._watchers.append(self._stateChanged)
512
513    def isCancelled(self):
514        """
515        Was the future cancelled.
516        """
517        return self._future.cancelled()
518
519    def isDone(self):
520        """
521        Is the future done (was cancelled or has finished).
522        """
523        return self._future.done()
524
525    def isRunning(self):
526        """
527        Is the future running (i.e. has started).
528        """
529        return self._future.running()
530
531    def isStarted(self):
532        """
533        Has the future computation started.
534        """
535        return self._future.running()
536
537    def result(self):
538        """
539        Return the result of the computation.
540        """
541        return self._future.result()
542
543    def exception(self):
544        """
545        Return the exception instance or `None` if no exception was raised.
546        """
547        return self._future.exception()
548
549    def customEvent(self, event):
550        """
551        Reimplemented from `QObject.customEvent`.
552        """
553        if event.type() == StateChangedEvent.StateChanged:
554            if event.state() == Future.Canceled:
555                self.cancelled.emit()
556            elif event.state() == Future.Running:
557                self.started.emit()
558            elif event.state() == Future.Finished:
559                self.finished.emit()
560            return
561
562        return QObject.customEvent(self, event)
563
564    def _stateChanged(self, future, state):
565        """
566        The `future` state has changed (called by :class:`Future`).
567        """
568        ev = StateChangedEvent(state)
569
570        if self.thread() is QThread.currentThread():
571            QCoreApplication.sendEvent(self, ev)
572        else:
573            QCoreApplication.postEvent(self, ev)
574
575
576class methodinvoke(object):
577    """
578    Create an QObject method wrapper that invokes the method asynchronously
579    in the object's own thread.
580
581    :param obj:
582        A QObject instance.
583    :param str method:
584        The method name.
585    :param tuple arg_types:
586        A tuple of positional argument types.
587
588    """
589    def __init__(self, obj, method, arg_types=()):
590        self.obj = obj
591        self.method = method
592        self.arg_types = tuple(arg_types)
593
594    def __call__(self, *args):
595        args = [Q_ARG(atype, arg) for atype, arg in zip(self.arg_types, args)]
596        QMetaObject.invokeMethod(
597            self.obj, self.method, Qt.QueuedConnection,
598            *args
599        )
600
601
602try:
603    import unittest2 as unittest
604except ImportError:
605    import unittest
606
607
608class TestFutures(unittest.TestCase):
609
610    def test_futures(self):
611        f = Future()
612        self.assertEqual(f.done(),  False)
613        self.assertEqual(f.running(),  False)
614
615        self.assertTrue(f.cancel())
616        self.assertTrue(f.cancelled())
617
618        with self.assertRaises(CancelledError):
619            f.result()
620
621        with self.assertRaises(CancelledError):
622            f.exception()
623
624        f = Future()
625        f.set_running_or_notify_cancel()
626
627        with self.assertRaises(TimeoutError):
628            f.result(0.1)
629
630        with self.assertRaises(TimeoutError):
631            f.exception(0.1)
632
633        f = Future()
634        f.set_running_or_notify_cancel()
635        f.set_result("result")
636
637        self.assertEqual(f.result(), "result")
638        self.assertEqual(f.exception(), None)
639
640        f = Future()
641        f.set_running_or_notify_cancel()
642
643        f.set_exception(Exception("foo"))
644
645        with self.assertRaises(Exception):
646            f.result()
647
648
649class TestExecutor(unittest.TestCase):
650
651    def setUp(self):
652        self.app = QCoreApplication([])
653
654    def test_executor(self):
655        executor = ThreadExecutor()
656        f1 = executor.submit(pow, 100, 100)
657
658        f2 = executor.submit(lambda: 1 / 0)
659
660        f3 = executor.submit(QThread.currentThread)
661
662        self.assertTrue(f1.result(), pow(100, 100))
663
664        with self.assertRaises(ZeroDivisionError):
665            f2.result()
666
667        self.assertIsInstance(f2.exception(), ZeroDivisionError)
668
669        self.assertIsNot(f3.result(), QThread.currentThread())
670
671    def test_methodinvoke(self):
672        executor = ThreadExecutor()
673        state = [None, None]
674
675        class StateSetter(QObject):
676            @Slot(object)
677            def set_state(self, value):
678                state[0] = value
679                state[1] = QThread.currentThread()
680
681        def func(callback):
682            callback(QThread.currentThread())
683
684        obj = StateSetter()
685        f1 = executor.submit(func, methodinvoke(obj, "set_state", (object,)))
686        f1.result()
687
688        # So invoked method can be called
689        QCoreApplication.processEvents()
690
691        self.assertIs(state[1], QThread.currentThread(),
692                      "set_state was called from the wrong thread")
693
694        self.assertIsNot(state[0], QThread.currentThread(),
695                         "set_state was invoked in the main thread")
696
697        executor.shutdown(wait=True)
698
699    def test_executor_map(self):
700        executor = ThreadExecutor()
701
702        r = executor.map(pow, range(1000), range(1000))
703
704        results = list(r)
705
706        self.assertTrue(len(results) == 1000)
707
708
709class TestFutureWatcher(unittest.TestCase):
710    def setUp(self):
711        self.app = QCoreApplication([])
712
713    def test_watcher(self):
714        executor = ThreadExecutor()
715        f = executor.submit(QThread.currentThread)
716        watcher = FutureWatcher(f)
717
718        if f.cancel():
719            self.assertTrue(watcher.isCancelled())
720
721        executor.shutdown()
722
723
724class TestTask(unittest.TestCase):
725    def setUp(self):
726        self.app = QCoreApplication([])
727
728    def test_task(self):
729        results = []
730
731        task = Task(function=QThread.currentThread)
732        task.resultReady.connect(results.append)
733
734        task.start()
735        self.app.processEvents()
736
737        self.assertSequenceEqual(results, [QThread.currentThread()])
738
739        results = []
740
741        thread = QThread()
742        thread.start()
743
744        task = Task(function=QThread.currentThread)
745
746        task.moveToThread(thread)
747
748        self.assertIsNot(task.thread(), QThread.currentThread())
749        self.assertIs(task.thread(), thread)
750
751        task.resultReady.connect(results.append, Qt.DirectConnection)
752        task.start()
753
754        f = task.future()
755
756        self.assertIsNot(f.result(3), QThread.currentThread())
757
758        self.assertIs(f.result(3), results[-1])
759
760    def test_executor(self):
761        executor = ThreadExecutor()
762
763        f = executor.submit(QThread.currentThread)
764
765        self.assertIsNot(f.result(3), QThread.currentThread())
766
767        f = executor.submit(lambda: 1 / 0)
768
769        with self.assertRaises(ZeroDivisionError):
770            f.result()
771
772        results = []
773        task = Task(function=QThread.currentThread)
774        task.resultReady.connect(results.append, Qt.DirectConnection)
775
776        f = executor.submit(task)
777
778        self.assertIsNot(f.result(3), QThread.currentThread())
779
780        executor.shutdown()
781
782
783############################################
784# DEPRECATED (not to mention extremely ugly)
785############################################
786
787
788class AsyncCall(QObject):
789    """
790    A wrapper class for async function calls using Qt's signals for
791    communication between threads
792
793    Arguments:
794        - `func`: a function to execute
795        - `thread`: a QThread instance to execute under (default `None`,
796           threadPool is used instead)
797        - `threadPool`: a QThreadPool instance to handle thread allocation
798           (for this to work `thread` argument must be `None`)
799        - `parent`: parent object (should be None for most cases)
800
801    Signals:
802        - `starting()`
803        - `finished(QString)` - emitted when finished with an argument 'OK'
804          on success or repr(ex) on error.
805        - `finished(PyQt_PyObject, QString)` - same as above but also
806          pass self as an argument.
807        - `unhandledException(PyQt_PyObject)` - emitted on error
808          with `sys.exc_info()` argument.
809        - `resultReady(PyQt_PyObject)` - emitted on success with
810          function call result as argument.
811
812    """
813
814    def __init__(self, func=None, args=(), kwargs={}, thread=None,
815                 threadPool=None, parent=None):
816        QObject.__init__(self, parent)
817        self.func = func
818        self._args = args
819        self._kwargs = kwargs
820        self.threadPool = None
821
822        self._connected = True
823        self._cancelRequested = False
824        self._started = False
825        self._cancelled = False
826
827        if thread is not None:
828            self.moveToThread(thread)
829        else:
830            if threadPool is None:
831                threadPool = QThreadPool.globalInstance()
832            self.threadPool = threadPool
833            self._runnable = _RunnableAsyncCall(self)
834            self.threadPool.start(self._runnable)
835            self._connected = False
836            return
837
838        self.connect(self, SIGNAL("_async_start()"), self.execute,
839                     Qt.QueuedConnection)
840
841    @pyqtSignature("execute()")
842    def execute(self):
843        """ Never call directly, use `__call__` or `apply_async` instead
844        """
845        assert(self.thread() is QThread.currentThread())
846        if self._cancelRequested:
847            self._cancelled = True
848            self._status = 2
849            self.emit(SIGNAL("finished(QString)"), QString("Cancelled"))
850            return
851
852        self._started = True
853        self.emit(SIGNAL("starting()"))
854        try:
855            self.result = self.func(*self._args, **self._kwargs)
856        except Exception, ex:
857            print >> sys.stderr, "Exception in thread ", \
858                  QThread.currentThread(), " while calling ", self.func
859            self.emit(SIGNAL("finished(QString)"), QString(repr(ex)))
860            self.emit(SIGNAL("finished(PyQt_PyObject, QString)"),
861                      self, QString(repr(ex)))
862            self.emit(SIGNAL("unhandledException(PyQt_PyObject)"),
863                      sys.exc_info())
864
865            self._exc_info = sys.exc_info()
866            self._status = 1
867            return
868
869        self.emit(SIGNAL("finished(QString)"), QString("Ok"))
870        self.emit(SIGNAL("finished(PyQt_PyObject, QString)"),
871                  self, QString("Ok"))
872        self.emit(SIGNAL("resultReady(PyQt_PyObject)"),
873                  self.result)
874        self._status = 0
875
876    def __call__(self, *args, **kwargs):
877        """ Apply the call with args and kwargs additional arguments
878        """
879        if args or kwargs:
880            self.func = partial(self.func, *self._args, **self._kwargs)
881            self._args, self._kwargs = args, kwargs
882
883        if not self._connected:
884            # delay until event loop initialized by _RunnableAsyncCall
885            QTimer.singleShot(50, self.__call__)
886            return
887        else:
888            self.emit(SIGNAL("_async_start()"))
889
890    def apply_async(self, func, args, kwargs):
891        """ call function with `args` as positional and `kwargs` as keyword
892        arguments (Overrides __init__ arguments).
893        """
894        self.func, self._args, self._kwargs = func, args, kwargs
895        self.__call__()
896
897    def poll(self):
898        """ Return the state of execution.
899        """
900        return getattr(self, "_status", None)
901
902    def join(self, processEvents=True):
903        """ Wait until the execution finishes.
904        """
905        while self.poll() is None:
906            QThread.currentThread().msleep(50)
907            if processEvents and QThread.currentThread() is qApp.thread():
908                qApp.processEvents()
909
910    def get_result(self, processEvents=True):
911        """ Block until the computation completes and return the call result.
912        If the execution resulted in an exception, this method will re-raise
913        it.
914        """
915        self.join(processEvents=processEvents)
916        if self.poll() != 0:
917            # re-raise the error
918            raise self._exc_info[0], self._exc_info[1]
919        else:
920            return self.result
921
922    def emitAdvance(self, count=1):
923        self.emit(SIGNAL("advance()"))
924        self.emit(SIGNAL("advance(int)"), count)
925
926    def emitProgressChanged(self, value):
927        self.emit(SIGNAL("progressChanged(float)"), value)
928
929    @pyqtSignature("moveToAndExecute(PyQt_PyObject)")
930    def moveToAndExecute(self, thread):
931        self.moveToThread(thread)
932
933        self.connect(self, SIGNAL("_async_start()"), self.execute,
934                     Qt.QueuedConnection)
935
936        self.emit(SIGNAL("_async_start()"))
937
938    @pyqtSignature("moveToAndInit(PyQt_PyObject)")
939    def moveToAndInit(self, thread):
940        self.moveToThread(thread)
941
942        self.connect(self, SIGNAL("_async_start()"), self.execute,
943                     Qt.QueuedConnection)
944        self._connected = True
945
946
947class WorkerThread(QThread):
948    """ A worker thread
949    """
950    def run(self):
951        self.exec_()
952
953
954class _RunnableTask(QRunnable):
955    """ Wrapper for an AsyncCall
956    """
957    def __init__(self, call):
958        QRunnable.__init__(self)
959        self.setAutoDelete(False)
960        self._call = call
961
962    def run(self):
963        if isinstance(self._call, AsyncCall):
964            self.eventLoop = QEventLoop()
965            self.eventLoop.processEvents()
966            QObject.connect(self._call, SIGNAL("finished(QString)"),
967                            lambda str: self.eventLoop.quit())
968            QMetaObject.invokeMethod(self._call, "moveToAndInit",
969                                     Qt.QueuedConnection,
970                                     Q_ARG("PyQt_PyObject",
971                                           QThread.currentThread()))
972            self.eventLoop.processEvents()
973            self.eventLoop.exec_()
974        else:
975            self._return = self._call()
976
977
978class _RunnableAsyncCall(_RunnableTask):
979    def run(self):
980        self.eventLoop = QEventLoop()
981        self.eventLoop.processEvents()
982        QObject.connect(self._call, SIGNAL("finished(QString)"),
983                        lambda str: self.eventLoop.quit())
984        QMetaObject.invokeMethod(self._call, "moveToAndInit",
985                                 Qt.QueuedConnection,
986                                 Q_ARG("PyQt_PyObject",
987                                       QThread.currentThread()))
988        self.eventLoop.processEvents()
989        self.eventLoop.exec_()
990
991
992def createTask(call, args=(), kwargs={}, onResult=None, onStarted=None,
993               onFinished=None, onError=None, thread=None, threadPool=None):
994    async = AsyncCall(thread=thread, threadPool=threadPool)
995    if onResult is not None:
996        async.connect(async, SIGNAL("resultReady(PyQt_PyObject)"), onResult,
997                      Qt.QueuedConnection)
998    if onStarted is not None:
999        async.connect(async, SIGNAL("starting()"), onStarted,
1000                      Qt.QueuedConnection)
1001    if onFinished is not None:
1002        async.connect(async, SIGNAL("finished(QString)"), onFinished,
1003                      Qt.QueuedConnection)
1004    if onError is not None:
1005        async.connect(async, SIGNAL("unhandledException(PyQt_PyObject)"),
1006                      onError, Qt.QueuedConnection)
1007    async.apply_async(call, args, kwargs)
1008    return async
1009
1010
1011class synchronized(object):
1012    def __init__(self, object, mode=QMutex.Recursive):
1013        if not hasattr(object, "_mutex"):
1014            object._mutex = QMutex(mode)
1015        self.mutex = object._mutex
1016
1017    def __enter__(self):
1018        self.mutex.lock()
1019        return self
1020
1021    def __exit__(self, exc_type=None, exc_value=None, tb=None):
1022        self.mutex.unlock()
Note: See TracBrowser for help on using the repository browser.