source: orange/Orange/OrangeWidgets/OWConcurrent.py @ 11481:08b42fee6cf6

Revision 11481:08b42fee6cf6, 28.4 KB checked in by Ales Erjavec <ales.erjavec@…>, 12 months ago (diff)

Extended the OWConcurrent module with 'concurrent.futures' like interface.

The old interface is now deprecated.

RevLine 
[8042]1"""\
2OWConcurent
3===========
4
5General helper functions and classes for Orange Canvas
[9603]6concurrent programming
7
[8042]8"""
9from __future__ import with_statement
[11481]10
11import sys
12import threading
13import logging
14
[8042]15from functools import partial
[11481]16from contextlib import contextmanager
[8042]17
[11481]18from PyQt4.QtGui import qApp
19
20from PyQt4.QtCore import (
21    Qt, QObject, QMetaObject, QTimer, QThreadPool, QThread, QMutex,
22    QRunnable, QEventLoop, QCoreApplication, QEvent, QString, SIGNAL,
23    Q_ARG, pyqtSignature,
24)
25
26from PyQt4.QtCore import pyqtSignal as Signal, pyqtSlot as Slot
27
28_log = logging.getLogger(__name__)
29
30
31@contextmanager
32def locked(mutex):
33    """
34    A context manager for locking an instance of a QMutex.
35    """
36    mutex.lock()
37    try:
38        yield
39    finally:
40        mutex.unlock()
41
42
43class _TaskDepotThread(QThread):
44    """
45    A special 'depot' thread used to transfer Task instance into threads
46    started by a QThreadPool.
47
48    """
49    def start(self):
50        """
51        Reimplemented from `QThread.start`
52        """
53        QThread.start(self)
54        # Need to also handle method invoke from this thread
55        self.moveToThread(self)
56
57    def run(self):
58        """
59        Reimplemented from `QThread.run`
60        """
61        # Start the event loop.
62        # On some old Qt4/PyQt4 installations base QThread.run does not seem
63        # to enter the loop, despite being documented to do so.
64        self.exec_()
65
66    @Slot(object, object)
67    def transfer(self, obj, thread):
68        """
69        Transfer `obj` (:class:`QObject`) instance from this thread to the
70        target `thread` (a :class:`QThread`).
71
72        """
73        assert obj.thread() is self
74        assert QThread.currentThread() is self
75        obj.moveToThread(thread)
76
77
78class _TaskRunnable(QRunnable):
79    """
80    A QRunnable for running a :class:`Task` by a :class:`ThreadExecuter`.
81    """
82    def __init__(self, future, task, args, kwargs):
83        QRunnable.__init__(self)
84        self.future = future
85        self.task = task
86        self.args = args
87        self.kwargs = kwargs
88        self.eventLoop = None
89
90    def run(self):
91        """
92        Reimplemented from `QRunnable.run`
93        """
94        self.eventLoop = QEventLoop()
95        self.eventLoop.processEvents()
96
97        # Move the task to the current thread so it's events, signals, slots
98        # are triggered from this thread.
99        assert isinstance(self.task.thread(), _TaskDepotThread)
100        QMetaObject.invokeMethod(
101            self.task.thread(), "transfer", Qt.BlockingQueuedConnection,
102            Q_ARG(object, self.task),
103            Q_ARG(object, QThread.currentThread())
104        )
105
106        self.eventLoop.processEvents()
107
108        # Schedule task.run from the event loop.
109        self.task.start()
110
111        # Quit the loop and exit when task finishes or is cancelled.
112        # TODO: If the task encounters an critical error it might not emit
113        # these signals and this Runnable will never complete.
114        self.task.finished.connect(self.eventLoop.quit)
115        self.task.cancelled.connect(self.eventLoop.quit)
116        self.eventLoop.exec_()
117
118
119class _Runnable(QRunnable):
120    """
121    A QRunnable for running plain functions by a :class:`ThreadExecuter`.
122    """
123    def __init__(self, future, func, args, kwargs):
124        QRunnable.__init__(self)
125        self.future = future
126        self.func = func
127        self.args = args
128        self.kwargs = kwargs
129
130    def run(self):
131        """
132        Reimplemented from QRunnable.run
133        """
134        try:
135            if not self.future.set_running_or_notify_cancel():
136                # Was cancelled
137                return
138            try:
139                result = self.func(*self.args, **self.kwargs)
140            except BaseException, ex:
141                self.future.set_exception(ex)
142            else:
143                self.future.set_result(result)
144        except BaseException:
145            _log.critical("Exception in worker thread.", exc_info=True)
146
147
148class ThreadExecutor(QObject):
149    """
150    ThreadExceuter object class provides an interface for running tasks
151    in a thread pool.
152
153    :param QObject parent:
154        Executor's parent instance.
155
156    :param QThreadPool threadPool:
157        Thread pool to be used by the instance of the Executor. If `None`
158        then ``QThreadPool.globalInstance()`` will be used.
159
160    """
161    def __init__(self, parent=None, threadPool=None):
162        QObject.__init__(self, parent)
163        if threadPool is None:
164            threadPool = QThreadPool.globalInstance()
165        self._threadPool = threadPool
166        self._depot_thread = None
167
168    def _get_depot_thread(self):
169        if self._depot_thread is None:
170            self._depot_thread = _TaskDepotThread()
171            self._depot_thread.start()
172
173        return self._depot_thread
174
175    def submit(self, func, *args, **kwargs):
176        """
177        Schedule the `func(*args, **kwargs)` to be executed and return an
178        :class:`Future` instance representing the result of the computation.
179
180        """
181        if isinstance(func, Task):
182            if func.thread() is not QThread.currentThread():
183                raise ValueError("Can only submit Tasks from it's own thread.")
184
185            if func.parent() is not None:
186                raise ValueError("Can not submit Tasks with a parent.")
187
188            func.moveToThread(self._get_depot_thread())
189            assert func.thread() is self._get_depot_thread()
190            # Use the Task's own Future object
191            f = func.future()
192            runnable = _TaskRunnable(f, func, args, kwargs)
193        else:
194            f = Future()
195            runnable = _Runnable(f, func, args, kwargs)
196        self._threadPool.start(runnable)
197
198        return f
199
200    def map(self, func, *iterables):
201        futures = [self.submit(func, *args) for args in zip(*iterables)]
202
203        for f in futures:
204            yield f.result()
205
206    def shutdown(self, wait=True):
207        """
208        Shutdown the executor and free all resources. If `wait` is True then
209        wait until all pending futures are executed or cancelled.
210        """
211        if self._depot_thread is not None:
212            QMetaObject.invokeMethod(
213                self._depot_thread, "quit", Qt.AutoConnection)
214
215        if wait:
216            self._threadPool.waitForDone()
217            if self._depot_thread:
218                self._depot_thread.wait()
219                self._depot_thread = None
220
221
222class ExecuteCallEvent(QEvent):
223    """
224    Represents an function call from the event loop (used by :class:`Task`
225    to schedule the :func:`Task.run` method to be invoked)
226
227    """
228    ExecuteCall = QEvent.registerEventType()
229
230    def __init__(self):
231        QEvent.__init__(self, ExecuteCallEvent.ExecuteCall)
232
233
234class Task(QObject):
235    """
236    """
237    started = Signal()
238    finished = Signal()
239    cancelled = Signal()
240    resultReady = Signal(object)
241    exceptionReady = Signal(Exception)
242
243    def __init__(self, parent=None, function=None):
244        QObject.__init__(self, parent)
245        self.function = function
246
247        self._future = Future()
248
249    def run(self):
250        if self.function is None:
251            raise NotImplementedError
252        else:
253            return self.function()
254
255    def start(self):
256        QCoreApplication.postEvent(self, ExecuteCallEvent())
257
258    def future(self):
259        return self._future
260
261    def result(self, timeout=None):
262        return self._future.result(timeout)
263
264    def _execute(self):
265        try:
266            if not self._future.set_running_or_notify_cancel():
267                self.cancelled.emit()
268                return
269
270            self.started.emit()
271            try:
272                result = self.run()
273            except BaseException, ex:
274                self._future.set_exception(ex)
275                self.exceptionReady.emit(ex)
276            else:
277                self._future.set_result(result)
278                self.resultReady.emit(result)
279
280            self.finished.emit()
281        except BaseException:
282            _log.critical("Exception in Task", exc_info=True)
283
284    def customEvent(self, event):
285        if event.type() == ExecuteCallEvent.ExecuteCall:
286            self._execute()
287        else:
288            QObject.customEvent(self, event)
289
290
291def futures_iter(futures):
292    for f in futures:
293        yield f.result()
294
295
296class TimeoutError(Exception):
297    pass
298
299
300class CancelledError(Exception):
301    pass
302
303
304class Future(object):
305    """
306    A :class:`Future` class represents a result of an asynchronous
307    computation.
308
309    """
310    Pending, Canceled, Running, Finished = 1, 2, 4, 8
311
312    def __init__(self):
313        self._watchers = []
314        self._state = Future.Pending
315        self._condition = threading.Condition()
316        self._result = None
317        self._exception = None
318
319    def _set_state(self, state):
320        if self._state != state:
321            self._state = state
322            for w in self._watchers:
323                w(self, state)
324
325    def cancel(self):
326        """
327        Attempt to cancel the the call. Return `False` if the call is
328        already in progress and cannot be canceled, otherwise return `True`.
329
330        """
331        with self._condition:
332            if self._state in [Future.Running, Future.Finished]:
333                return False
334            elif self._state == Future.Canceled:
335                return True
336            else:
337                self._state = Future.Canceled
338                self._condition.notify_all()
339
340        return True
341
342    def cancelled(self):
343        """
344        Return `True` if call was successfully cancelled.
345        """
346        with self._condition:
347            return self._state == Future.Canceled
348
349    def done(self):
350        """
351        Return `True` if the call was successfully cancelled or finished
352        running.
353
354        """
355        with self._condition:
356            return self._state in [Future.Canceled, Future.Finished]
357
358    def running(self):
359        """
360        Return True if the call is currently being executed.
361        """
362        with self._condition:
363            return self._state == Future.Running
364
365    def _get_result(self):
366        if self._exception:
367            raise self._exception
368        else:
369            return self._result
370
371    def result(self, timeout=None):
372        """
373        Return the result of the :class:`Futures` computation. If `timeout`
374        is `None` the call will block until either the computation finished
375        or is cancelled.
376        """
377        with self._condition:
378            if self._state == Future.Finished:
379                return self._get_result()
380            elif self._state == Future.Canceled:
381                raise CancelledError()
382
383            self._condition.wait(timeout)
384
385            if self._state == Future.Finished:
386                return self._get_result()
387            elif self._state == Future.Canceled:
388                raise CancelledError()
389            else:
390                raise TimeoutError()
391
392    def exception(self, timeout=None):
393        """
394        Return the exception instance (if any) resulting from the execution
395        of the :class:`Future`. Can raise a :class:`CancelledError` if the
396        computation was cancelled.
397
398        """
399        with self._condition:
400            if self._state == Future.Finished:
401                return self._exception
402            elif self._state == Future.Canceled:
403                raise CancelledError()
404
405            self._condition.wait(timeout)
406
407            if self._state == Future.Finished:
408                return self._exception
409            elif self._state == Future.Canceled:
410                raise CancelledError()
411            else:
412                raise TimeoutError()
413
414    def set_result(self, result):
415        """
416        Set the result of the computation (called by the worker thread).
417        """
418        with self._condition:
419            self._result = result
420            self._state = Future.Finished
421            self._condition.notify_all()
422
423    def set_exception(self, exception):
424        """
425        Set the exception instance that was raised by the computation
426        (called by the worker thread).
427
428        """
429        with self._condition:
430            self._exception = exception
431            self._state = Future.Finished
432            self._condition.notify_all()
433
434    def set_running_or_notify_cancel(self):
435        with self._condition:
436            if self._state == Future.Canceled:
437                return False
438            elif self._state == Future.Pending:
439                self._state = Future.Running
440                return True
441            else:
442                raise Exception()
443
444
445class StateChangedEvent(QEvent):
446    """
447    Represents a change in the internal state of a :class:`Future`.
448    """
449    StateChanged = QEvent.registerEventType()
450
451    def __init__(self, state):
452        QEvent.__init__(self, StateChangedEvent.StateChanged)
453        self._state = state
454
455    def state(self):
456        """
457        Return the new state (Future.Pending, Future.Cancelled, ...).
458        """
459        return self._state
460
461
462class FutureWatcher(QObject):
463    """
464    A `FutureWatcher` class provides a convenient interface to the
465    :class:`Future` instance using Qt's signals.
466
467    :param :class:`Future` future:
468        A :class:`Future` instance to watch.
469    :param :class:`QObject` parent:
470        Object's parent instance.
471
472    """
473    #: The future was cancelled.
474    cancelled = Signal()
475
476    #: The future has finished.
477    finished = Signal()
478
479    #: The future has started computation.
480    started = Signal()
481
482    def __init__(self, future, parent=None):
483        QObject.__init__(self, parent)
484        self._future = future
485
486        self._future._watchers.append(self._stateChanged)
487
488    def isCancelled(self):
489        """
490        Was the future cancelled.
491        """
492        return self._future.cancelled()
493
494    def isDone(self):
495        """
496        Is the future done (was cancelled or has finished).
497        """
498        return self._future.done()
499
500    def isRunning(self):
501        """
502        Is the future running (i.e. has started).
503        """
504        return self._future.running()
505
506    def isStarted(self):
507        """
508        Has the future computation started.
509        """
510        return self._future.running()
511
512    def result(self):
513        """
514        Return the result of the computation.
515        """
516        return self._future.result()
517
518    def exception(self):
519        """
520        Return the exception instance or `None` if no exception was raised.
521        """
522        return self._future.exception()
523
524    def customEvent(self, event):
525        """
526        Reimplemented from `QObject.customEvent`.
527        """
528        if event.type() == StateChangedEvent.StateChanged:
529            if event.state() == Future.Canceled:
530                self.cancelled.emit()
531            elif event.state() == Future.Running:
532                self.started.emit()
533            elif event.state() == Future.Finished:
534                self.finished.emit()
535            return
536
537        return QObject.customEvent(self, event)
538
539    def _stateChanged(self, future, state):
540        """
541        The `future` state has changed (called by :class:`Future`).
542        """
543        ev = StateChangedEvent(state)
544
545        if self.thread() is QThread.currentThread():
546            QCoreApplication.sendEvent(self, ev)
547        else:
548            QCoreApplication.postEvent(self, ev)
549
550
551class methodinvoke(object):
552    """
553    Create an QObject method wrapper that invokes the method asynchronously
554    in the object's own thread.
555
556    :param obj:
557        A QObject instance.
558    :param str method:
559        The method name.
560    :param tuple arg_types:
561        A tuple of positional argument types.
562
563    """
564    def __init__(self, obj, method, arg_types=()):
565        self.obj = obj
566        self.method = method
567        self.arg_types = tuple(arg_types)
568
569    def __call__(self, *args):
570        args = [Q_ARG(atype, arg) for atype, arg in zip(self.arg_types, args)]
571        QMetaObject.invokeMethod(
572            self.obj, self.method, Qt.QueuedConnection,
573            *args
574        )
575
576
577try:
578    import unittest2 as unittest
579except ImportError:
580    import unittest
581
582
583class TestFutures(unittest.TestCase):
584
585    def test_futures(self):
586        f = Future()
587        self.assertEqual(f.done(),  False)
588        self.assertEqual(f.running(),  False)
589
590        self.assertTrue(f.cancel())
591        self.assertTrue(f.cancelled())
592
593        with self.assertRaises(CancelledError):
594            f.result()
595
596        with self.assertRaises(CancelledError):
597            f.exception()
598
599        f = Future()
600        f.set_running_or_notify_cancel()
601
602        with self.assertRaises(TimeoutError):
603            f.result(0.1)
604
605        with self.assertRaises(TimeoutError):
606            f.exception(0.1)
607
608        f = Future()
609        f.set_running_or_notify_cancel()
610        f.set_result("result")
611
612        self.assertEqual(f.result(), "result")
613        self.assertEqual(f.exception(), None)
614
615        f = Future()
616        f.set_running_or_notify_cancel()
617
618        f.set_exception(Exception("foo"))
619
620        with self.assertRaises(Exception):
621            f.result()
622
623
624class TestExecutor(unittest.TestCase):
625
626    def setUp(self):
627        self.app = QCoreApplication([])
628
629    def test_executor(self):
630        executor = ThreadExecutor()
631        f1 = executor.submit(pow, 100, 100)
632
633        f2 = executor.submit(lambda: 1 / 0)
634
635        f3 = executor.submit(QThread.currentThread)
636
637        self.assertTrue(f1.result(), pow(100, 100))
638
639        with self.assertRaises(ZeroDivisionError):
640            f2.result()
641
642        self.assertIsInstance(f2.exception(), ZeroDivisionError)
643
644        self.assertIsNot(f3.result(), QThread.currentThread())
645
646    def test_methodinvoke(self):
647        executor = ThreadExecutor()
648        state = [None, None]
649
650        class StateSetter(QObject):
651            @Slot(object)
652            def set_state(self, value):
653                state[0] = value
654                state[1] = QThread.currentThread()
655
656        def func(callback):
657            callback(QThread.currentThread())
658
659        obj = StateSetter()
660        f1 = executor.submit(func, methodinvoke(obj, "set_state", (object,)))
661        f1.result()
662
663        # So invoked method can be called
664        QCoreApplication.processEvents()
665
666        self.assertIs(state[1], QThread.currentThread(),
667                      "set_state was called from the wrong thread")
668
669        self.assertIsNot(state[0], QThread.currentThread(),
670                         "set_state was invoked in the main thread")
671
672        executor.shutdown(wait=True)
673
674    def test_executor_map(self):
675        executor = ThreadExecutor()
676
677        r = executor.map(pow, range(1000), range(1000))
678
679        results = list(r)
680
681        self.assertTrue(len(results) == 1000)
682
683
684class TestFutureWatcher(unittest.TestCase):
685    def setUp(self):
686        self.app = QCoreApplication([])
687
688    def test_watcher(self):
689        executor = ThreadExecutor()
690        f = executor.submit(QThread.currentThread)
691        watcher = FutureWatcher(f)
692
693        if f.cancel():
694            self.assertTrue(watcher.isCancelled())
695
696        executor.shutdown()
697
698
699class TestTask(unittest.TestCase):
700    def setUp(self):
701        self.app = QCoreApplication([])
702
703    def test_task(self):
704        results = []
705
706        task = Task(function=QThread.currentThread)
707        task.resultReady.connect(results.append)
708
709        task.start()
710        self.app.processEvents()
711
712        self.assertSequenceEqual(results, [QThread.currentThread()])
713
714        results = []
715
716        thread = QThread()
717        thread.start()
718
719        task = Task(function=QThread.currentThread)
720
721        task.moveToThread(thread)
722
723        self.assertIsNot(task.thread(), QThread.currentThread())
724        self.assertIs(task.thread(), thread)
725
726        task.resultReady.connect(results.append, Qt.DirectConnection)
727        task.start()
728
729        f = task.future()
730
731        self.assertIsNot(f.result(3), QThread.currentThread())
732
733        self.assertIs(f.result(3), results[-1])
734
735    def test_executor(self):
736        executor = ThreadExecutor()
737
738        f = executor.submit(QThread.currentThread)
739
740        self.assertIsNot(f.result(3), QThread.currentThread())
741
742        f = executor.submit(lambda: 1 / 0)
743
744        with self.assertRaises(ZeroDivisionError):
745            f.result()
746
747        results = []
748        task = Task(function=QThread.currentThread)
749        task.resultReady.connect(results.append, Qt.DirectConnection)
750
751        f = executor.submit(task)
752
753        self.assertIsNot(f.result(3), QThread.currentThread())
754
755        executor.shutdown()
756
757
758############################################
759# DEPRECATED (not to mention extremely ugly)
760############################################
761
[8042]762
763class AsyncCall(QObject):
[11481]764    """
765    A wrapper class for async function calls using Qt's signals for
766    communication between threads
[8042]767
768    Arguments:
769        - `func`: a function to execute
770        - `thread`: a QThread instance to execute under (default `None`,
771           threadPool is used instead)
772        - `threadPool`: a QThreadPool instance to handle thread allocation
773           (for this to work `thread` argument must be `None`)
774        - `parent`: parent object (should be None for most cases)
[11481]775
[8042]776    Signals:
777        - `starting()`
[11481]778        - `finished(QString)` - emitted when finished with an argument 'OK'
779          on success or repr(ex) on error.
780        - `finished(PyQt_PyObject, QString)` - same as above but also
781          pass self as an argument.
782        - `unhandledException(PyQt_PyObject)` - emitted on error
783          with `sys.exc_info()` argument.
784        - `resultReady(PyQt_PyObject)` - emitted on success with
785          function call result as argument.
786
[8042]787    """
[11481]788
789    def __init__(self, func=None, args=(), kwargs={}, thread=None,
790                 threadPool=None, parent=None):
[8042]791        QObject.__init__(self, parent)
792        self.func = func
793        self._args = args
794        self._kwargs = kwargs
795        self.threadPool = None
[11481]796
[9603]797        self._connected = True
798        self._cancelRequested = False
799        self._started = False
800        self._cancelled = False
[11481]801
[8042]802        if thread is not None:
803            self.moveToThread(thread)
804        else:
805            if threadPool is None:
806                threadPool = QThreadPool.globalInstance()
807            self.threadPool = threadPool
[11481]808            self._runnable = _RunnableAsyncCall(self)
[8042]809            self.threadPool.start(self._runnable)
810            self._connected = False
811            return
812
[11481]813        self.connect(self, SIGNAL("_async_start()"), self.execute,
814                     Qt.QueuedConnection)
[8042]815
816    @pyqtSignature("execute()")
817    def execute(self):
818        """ Never call directly, use `__call__` or `apply_async` instead
819        """
820        assert(self.thread() is QThread.currentThread())
[9603]821        if self._cancelRequested:
822            self._cancelled = True
823            self._status = 2
824            self.emit(SIGNAL("finished(QString)"), QString("Cancelled"))
825            return
[11481]826
[9603]827        self._started = True
[8042]828        self.emit(SIGNAL("starting()"))
829        try:
[11481]830            self.result = self.func(*self._args, **self._kwargs)
[8042]831        except Exception, ex:
[11481]832            print >> sys.stderr, "Exception in thread ", \
833                  QThread.currentThread(), " while calling ", self.func
[8042]834            self.emit(SIGNAL("finished(QString)"), QString(repr(ex)))
[11481]835            self.emit(SIGNAL("finished(PyQt_PyObject, QString)"),
836                      self, QString(repr(ex)))
837            self.emit(SIGNAL("unhandledException(PyQt_PyObject)"),
838                      sys.exc_info())
[8042]839
840            self._exc_info = sys.exc_info()
841            self._status = 1
842            return
843
844        self.emit(SIGNAL("finished(QString)"), QString("Ok"))
[11481]845        self.emit(SIGNAL("finished(PyQt_PyObject, QString)"),
846                  self, QString("Ok"))
847        self.emit(SIGNAL("resultReady(PyQt_PyObject)"),
848                  self.result)
[8042]849        self._status = 0
850
851    def __call__(self, *args, **kwargs):
852        """ Apply the call with args and kwargs additional arguments
853        """
854        if args or kwargs:
855            self.func = partial(self.func, *self._args, **self._kwargs)
856            self._args, self._kwargs = args, kwargs
[11481]857
[8042]858        if not self._connected:
[11481]859            # delay until event loop initialized by _RunnableAsyncCall
860            QTimer.singleShot(50, self.__call__)
[8042]861            return
862        else:
863            self.emit(SIGNAL("_async_start()"))
864
865    def apply_async(self, func, args, kwargs):
866        """ call function with `args` as positional and `kwargs` as keyword
867        arguments (Overrides __init__ arguments).
868        """
869        self.func, self._args, self._kwargs = func, args, kwargs
870        self.__call__()
871
872    def poll(self):
873        """ Return the state of execution.
874        """
875        return getattr(self, "_status", None)
[11481]876
[8042]877    def join(self, processEvents=True):
878        """ Wait until the execution finishes.
879        """
880        while self.poll() is None:
881            QThread.currentThread().msleep(50)
882            if processEvents and QThread.currentThread() is qApp.thread():
883                qApp.processEvents()
[11481]884
[8042]885    def get_result(self, processEvents=True):
886        """ Block until the computation completes and return the call result.
887        If the execution resulted in an exception, this method will re-raise
[11481]888        it.
[8042]889        """
890        self.join(processEvents=processEvents)
[11481]891        if self.poll() != 0:
[8042]892            # re-raise the error
893            raise self._exc_info[0], self._exc_info[1]
894        else:
895            return self.result
[11481]896
[8042]897    def emitAdvance(self, count=1):
898        self.emit(SIGNAL("advance()"))
899        self.emit(SIGNAL("advance(int)"), count)
[11481]900
[8042]901    def emitProgressChanged(self, value):
902        self.emit(SIGNAL("progressChanged(float)"), value)
[11481]903
[8042]904    @pyqtSignature("moveToAndExecute(PyQt_PyObject)")
905    def moveToAndExecute(self, thread):
906        self.moveToThread(thread)
[11481]907
908        self.connect(self, SIGNAL("_async_start()"), self.execute,
909                     Qt.QueuedConnection)
910
[8042]911        self.emit(SIGNAL("_async_start()"))
[11481]912
[8042]913    @pyqtSignature("moveToAndInit(PyQt_PyObject)")
914    def moveToAndInit(self, thread):
915        self.moveToThread(thread)
[11481]916
917        self.connect(self, SIGNAL("_async_start()"), self.execute,
918                     Qt.QueuedConnection)
[8042]919        self._connected = True
[11481]920
[8042]921
922class WorkerThread(QThread):
923    """ A worker thread
924    """
925    def run(self):
926        self.exec_()
[11481]927
928
929class _RunnableTask(QRunnable):
[8042]930    """ Wrapper for an AsyncCall
931    """
932    def __init__(self, call):
933        QRunnable.__init__(self)
934        self.setAutoDelete(False)
935        self._call = call
[11481]936
[8042]937    def run(self):
938        if isinstance(self._call, AsyncCall):
939            self.eventLoop = QEventLoop()
940            self.eventLoop.processEvents()
[11481]941            QObject.connect(self._call, SIGNAL("finished(QString)"),
942                            lambda str: self.eventLoop.quit())
943            QMetaObject.invokeMethod(self._call, "moveToAndInit",
944                                     Qt.QueuedConnection,
945                                     Q_ARG("PyQt_PyObject",
946                                           QThread.currentThread()))
[8042]947            self.eventLoop.processEvents()
948            self.eventLoop.exec_()
949        else:
950            self._return = self._call()
[11481]951
952
953class _RunnableAsyncCall(_RunnableTask):
[8042]954    def run(self):
955        self.eventLoop = QEventLoop()
956        self.eventLoop.processEvents()
[11481]957        QObject.connect(self._call, SIGNAL("finished(QString)"),
958                        lambda str: self.eventLoop.quit())
959        QMetaObject.invokeMethod(self._call, "moveToAndInit",
960                                 Qt.QueuedConnection,
961                                 Q_ARG("PyQt_PyObject",
962                                       QThread.currentThread()))
[8042]963        self.eventLoop.processEvents()
964        self.eventLoop.exec_()
965
[11481]966
967def createTask(call, args=(), kwargs={}, onResult=None, onStarted=None,
968               onFinished=None, onError=None, thread=None, threadPool=None):
[8042]969    async = AsyncCall(thread=thread, threadPool=threadPool)
970    if onResult is not None:
[11481]971        async.connect(async, SIGNAL("resultReady(PyQt_PyObject)"), onResult,
972                      Qt.QueuedConnection)
[8042]973    if onStarted is not None:
[11481]974        async.connect(async, SIGNAL("starting()"), onStarted,
975                      Qt.QueuedConnection)
[8042]976    if onFinished is not None:
[11481]977        async.connect(async, SIGNAL("finished(QString)"), onFinished,
978                      Qt.QueuedConnection)
[8042]979    if onError is not None:
[11481]980        async.connect(async, SIGNAL("unhandledException(PyQt_PyObject)"),
981                      onError, Qt.QueuedConnection)
[8042]982    async.apply_async(call, args, kwargs)
983    return async
984
[11481]985
[8042]986class synchronized(object):
987    def __init__(self, object, mode=QMutex.Recursive):
988        if not hasattr(object, "_mutex"):
989            object._mutex = QMutex(mode)
990        self.mutex = object._mutex
[11481]991
[8042]992    def __enter__(self):
993        self.mutex.lock()
994        return self
[11481]995
[8042]996    def __exit__(self, exc_type=None, exc_value=None, tb=None):
997        self.mutex.unlock()
Note: See TracBrowser for help on using the repository browser.