source: orange/orange/OrangeWidgets/OWConcurrent.py @ 9603:96f043b17452

Revision 9603:96f043b17452, 11.7 KB checked in by ales_erjavec, 2 years ago (diff)

Added support task canceling (before it is run).

Line 
1"""\
2OWConcurent
3===========
4
5General helper functions and classes for Orange Canvas
6concurrent programming
7
8"""
9from __future__ import with_statement
10from functools import partial
11
12   
13from OWWidget import *
14
15class AsyncCall(QObject):
16    """ A wrapper class for async function calls using
17    Qt's signals for communication with GUI thread
18
19    Arguments:
20        - `func`: a function to execute
21        - `thread`: a QThread instance to execute under (default `None`,
22           threadPool is used instead)
23        - `threadPool`: a QThreadPool instance to handle thread allocation
24           (for this to work `thread` argument must be `None`)
25        - `parent`: parent object (should be None for most cases)
26       
27    Signals:
28        - `starting()`
29        - `finished(QString)` - emited when finished with an argument 'OK' on success or repr(ex) on error
30        - `finished(PyQt_PyObject, QString)` - same as above but also pass self as an argument
31        - `unhandledException(PyQt_PyObject)` - emited on error with `sys.exc_info()` argument
32        - `resultReady(PyQt_PyObject)` - emited on success with function call result as argument
33    """
34    def __init__(self, func=None, args=(), kwargs={}, thread=None, threadPool=None, parent=None):
35        QObject.__init__(self, parent)
36        self.func = func
37        self._args = args
38        self._kwargs = kwargs
39        self.threadPool = None
40       
41        self._connected = True
42        self._cancelRequested = False
43        self._started = False
44        self._cancelled = False
45       
46        if thread is not None:
47            self.moveToThread(thread)
48        else:
49            if threadPool is None:
50                threadPool = QThreadPool.globalInstance()
51            self.threadPool = threadPool
52            self._runnable = RunnableAsyncCall(self)
53            self.threadPool.start(self._runnable)
54            self._connected = False
55            return
56           
57        self.connect(self, SIGNAL("_async_start()"), self.execute, Qt.QueuedConnection)
58       
59
60
61    @pyqtSignature("execute()")
62    def execute(self):
63        """ Never call directly, use `__call__` or `apply_async` instead
64        """
65        assert(self.thread() is QThread.currentThread())
66        if self._cancelRequested:
67            self._cancelled = True
68            self._status = 2
69            self.emit(SIGNAL("finished(QString)"), QString("Cancelled"))
70            return
71        self._started = True
72        self.emit(SIGNAL("starting()"))
73        try:
74            self.result  = self.func(*self._args, **self._kwargs)
75        except Exception, ex:
76            print >> sys.stderr, "Exception in thread ", QThread.currentThread(), " while calling ", self.func
77            self.emit(SIGNAL("finished(QString)"), QString(repr(ex)))
78            self.emit(SIGNAL("finished(PyQt_PyObject, QString)"), self, QString(repr(ex)))
79            self.emit(SIGNAL("unhandledException(PyQt_PyObject)"), sys.exc_info())
80
81            self._exc_info = sys.exc_info()
82            self._status = 1
83            return
84
85        self.emit(SIGNAL("finished(QString)"), QString("Ok"))
86        self.emit(SIGNAL("finished(PyQt_PyObject, QString)"), self, QString("Ok"))
87        self.emit(SIGNAL("resultReady(PyQt_PyObject)"), self.result)
88        self._status = 0
89
90
91    def __call__(self, *args, **kwargs):
92        """ Apply the call with args and kwargs additional arguments
93        """
94        if args or kwargs:
95            self.func = partial(self.func, *self._args, **self._kwargs)
96            self._args, self._kwargs = args, kwargs
97           
98        if not self._connected:
99            QTimer.singleShot(50, self.__call__) # delay until event loop initialized by RunnableAsyncCall
100            return
101        else:
102            self.emit(SIGNAL("_async_start()"))
103
104
105    def apply_async(self, func, args, kwargs):
106        """ call function with `args` as positional and `kwargs` as keyword
107        arguments (Overrides __init__ arguments).
108        """
109        self.func, self._args, self._kwargs = func, args, kwargs
110        self.__call__()
111
112
113    def poll(self):
114        """ Return the state of execution.
115        """
116        return getattr(self, "_status", None)
117   
118   
119    def join(self, processEvents=True):
120        """ Wait until the execution finishes.
121        """
122        while self.poll() is None:
123            QThread.currentThread().msleep(50)
124            if processEvents and QThread.currentThread() is qApp.thread():
125                qApp.processEvents()
126               
127    def get_result(self, processEvents=True):
128        """ Block until the computation completes and return the call result.
129        If the execution resulted in an exception, this method will re-raise
130        it.
131        """
132        self.join(processEvents=processEvents)
133        if self.poll() != 0: 
134            # re-raise the error
135            raise self._exc_info[0], self._exc_info[1]
136        else:
137            return self.result
138   
139    def emitAdvance(self, count=1):
140        self.emit(SIGNAL("advance()"))
141        self.emit(SIGNAL("advance(int)"), count)
142       
143       
144    def emitProgressChanged(self, value):
145        self.emit(SIGNAL("progressChanged(float)"), value)
146       
147   
148    @pyqtSignature("moveToAndExecute(PyQt_PyObject)")
149    def moveToAndExecute(self, thread):
150        self.moveToThread(thread)
151       
152        self.connect(self, SIGNAL("_async_start()"), self.execute, Qt.QueuedConnection)
153       
154        self.emit(SIGNAL("_async_start()"))
155       
156       
157    @pyqtSignature("moveToAndInit(PyQt_PyObject)")
158    def moveToAndInit(self, thread):
159        self.moveToThread(thread)
160       
161        self.connect(self, SIGNAL("_async_start()"), self.execute, Qt.QueuedConnection)
162        self._connected = True
163       
164
165class WorkerThread(QThread):
166    """ A worker thread
167    """
168    def run(self):
169        self.exec_()
170       
171       
172class RunnableTask(QRunnable):
173    """ Wrapper for an AsyncCall
174    """
175    def __init__(self, call):
176        QRunnable.__init__(self)
177        self.setAutoDelete(False)
178        self._call = call
179       
180    def run(self):
181        if isinstance(self._call, AsyncCall):
182            self.eventLoop = QEventLoop()
183            self.eventLoop.processEvents()
184            QObject.connect(self._call, SIGNAL("finished(QString)"), lambda str: self.eventLoop.quit())
185            QMetaObject.invokeMethod(self._call, "moveToAndInit", Qt.QueuedConnection, Q_ARG("PyQt_PyObject", QThread.currentThread()))
186            self.eventLoop.processEvents()
187            self.eventLoop.exec_()
188        else:
189            self._return = self._call()
190           
191           
192class RunnableAsyncCall(RunnableTask):
193    def run(self):
194        self.eventLoop = QEventLoop()
195        self.eventLoop.processEvents()
196        QObject.connect(self._call, SIGNAL("finished(QString)"), lambda str: self.eventLoop.quit())
197        QMetaObject.invokeMethod(self._call, "moveToAndInit", Qt.QueuedConnection, Q_ARG("PyQt_PyObject", QThread.currentThread()))
198        self.eventLoop.processEvents()
199        self.eventLoop.exec_()
200
201def createTask(call, args=(), kwargs={}, onResult=None, onStarted=None, onFinished=None, onError=None, thread=None, threadPool=None):
202    async = AsyncCall(thread=thread, threadPool=threadPool)
203    if onResult is not None:
204        async.connect(async, SIGNAL("resultReady(PyQt_PyObject)"), onResult, Qt.QueuedConnection)
205    if onStarted is not None:
206        async.connect(async, SIGNAL("starting()"), onStarted, Qt.QueuedConnection)
207    if onFinished is not None:
208        async.connect(async, SIGNAL("finished(QString)"), onFinished, Qt.QueuedConnection)
209    if onError is not None:
210        async.connect(async, SIGNAL("unhandledException(PyQt_PyObject)"), onError, Qt.QueuedConnection)
211    async.apply_async(call, args, kwargs)
212    return async
213       
214from functools import partial
215       
216class ProgressBar(QObject):
217    """ A thread safe progress callback using Qt's signal mechanism
218    to deliver progress updates to the GUI thread. Make sure this object instance
219    is created in the GUI thread or is a child of an object from the GUI thread
220    """
221   
222    def __init__(self, widget, iterations, parent=None):
223        QObject.__init__(self, parent)
224        assert (qApp.thread() is self.thread())
225        self.iter = iterations
226        self.widget = widget
227        self.count = 0
228        self.widget.progressBarInit()
229       
230    def advance(self, count=1):
231        self.count += count
232        value = int(self.count*100/self.iter)
233        QMetaObject.invokeMethod(self, "queuedInvoke", Qt.QueuedConnection, Q_ARG("PyQt_PyObject", partial(self.widget.progressBarSet, value)))
234
235    def finish(self):
236        QMetaObject.invokeMethod(self, "queuedInvoke", Qt.QueuedConnection, Q_ARG("PyQt_PyObject", self.widget.progressBarFinished))
237       
238    def progressBarSet(self, value):
239        QMetaObject.invokeMethod(self, "queuedInvoke", Qt.QueuedConnection, Q_ARG("PyQt_PyObject", partial(self.widget.progressBarSet, value)))
240   
241    @pyqtSignature("queuedInvoke(PyQt_PyObject)")
242    def queuedInvoke(self, func):
243        func()
244       
245       
246class synchronized(object):
247    def __init__(self, object, mode=QMutex.Recursive):
248        if not hasattr(object, "_mutex"):
249            object._mutex = QMutex(mode)
250        self.mutex = object._mutex
251       
252    def __enter__(self):
253        self.mutex.lock()
254        return self
255   
256    def __exit__(self, exc_type=None, exc_value=None, tb=None):
257        self.mutex.unlock()
258
259_global_thread_pools = {}
260       
261       
262def threadPool(self, class_="global", private=False, maxCount=None):
263    with synchronized(threadPool):
264        if private:
265            pools = self._private_thread_pools
266        else:
267            pools = _global_thread_pools
268           
269        if class_ not in pools:
270            if class_ == "global":
271                instance = QThreadPool.globalInstance()
272            else:
273                instance = QThreadPool()
274                instance.setMaxThreadCount(maxCount)
275            pools[class_] = instance
276        return pools[class_]
277   
278OWBaseWidget.threadPool = threadPool
279       
280
281"""\
282A multiprocessing like API
283==========================
284
285Incomplette
286"""
287
288class Process(AsyncCall):
289    _process_id = 0
290    def __init__(group=None, target=None, name=None, args=(), kwargs={}):
291        self.worker = WorkerThread()
292        AsyncCall.__init__(self, thread=self.worker)
293       
294        self.conenct(self, SIGANL("finished(QString)"), self.onFinished, Qt.QueuedConnection)
295        self.connect(self, SIGNAL("finished(QString)"), lambda:self.worker.quit(), Qt.QueuedConnection)
296        self.target = target
297        self.args = args
298        self.kwargs = kwargs
299        if name is None:
300            self.name = "Process-%i" % self._process_id
301            Process._process_id += 1
302        else:
303            self.name = name
304        self.exitcode = -1
305           
306    def start(self):
307        self.worker.start()
308        self.async_call(self.run)
309
310    def run(self):
311        self._result = self.target(*self.args, **self.kwargs)
312         
313    def join(self):
314        while self.poll() is None:
315            time.sleep(10)
316
317    def is_alive(self):
318        return self.poll() is None
319   
320    def onFinished(self, string):
321        self.exitcode = self._status
322       
323    def terminate(self):
324        self.worker.terminate()
325   
326from Queue import Queue
327
328class Pool(QObject):
329    def __init__(self, processes=None):
330        if processes is None:
331            import multiprocessing
332            processes = multiprocessing.cpu_count()
333        self.processes = processes
334        self.pool = [Process() for i in range(processes)]
335        self._i = 0
336    def get_process(self):
337        self._i = (self._i + 1) % len(self.pool)
338        return self.pool[self._i]
339     
340    def apply_async(func, args, kwargs):
341        process = self.get_process()
342        process.start()
343       
344    def start(self, ):
345        pass
346   
Note: See TracBrowser for help on using the repository browser.