source: orange/orange/OrangeWidgets/OWConcurrent.py @ 8042:ffcb93bc9028

Revision 8042:ffcb93bc9028, 11.4 KB checked in by markotoplak, 3 years ago (diff)

Hierarchical clustering: also catch RuntimeError when importing matplotlib (or the documentation could not be built on server).

Line 
1"""\
2OWConcurent
3===========
4
5General helper functions and classes for Orange Canvas
6concurent programming
7"""
8from __future__ import with_statement
9from functools import partial
10
11   
12from OWWidget import *
13
14class AsyncCall(QObject):
15    """ A wrapper class for async function calls using
16    Qt's signals for communication with GUI thread
17
18    Arguments:
19        - `func`: a function to execute
20        - `thread`: a QThread instance to execute under (default `None`,
21           threadPool is used instead)
22        - `threadPool`: a QThreadPool instance to handle thread allocation
23           (for this to work `thread` argument must be `None`)
24        - `parent`: parent object (should be None for most cases)
25       
26    Signals:
27        - `starting()`
28        - `finished(QString)` - emited when finished with an argument 'OK' on success or repr(ex) on error
29        - `finished(PyQt_PyObject, QString)` - same as above but also pass self as an argument
30        - `unhandledException(PyQt_PyObject)` - emited on error with `sys.exc_info()` argument
31        - `resultReady(PyQt_PyObject)` - emited on success with function call result as argument
32    """
33    def __init__(self, func=None, args=(), kwargs={}, thread=None, threadPool=None, parent=None):
34        QObject.__init__(self, parent)
35        self.func = func
36        self._args = args
37        self._kwargs = kwargs
38        self.threadPool = None
39        if thread is not None:
40            self.moveToThread(thread)
41        else:
42            if threadPool is None:
43                threadPool = QThreadPool.globalInstance()
44            self.threadPool = threadPool
45            self._runnable = RunnableAsyncCall(self)
46            self.threadPool.start(self._runnable)
47            self._connected = False
48            return
49           
50        self.connect(self, SIGNAL("_async_start()"), self.execute, Qt.QueuedConnection)
51        self._connected = True
52
53
54    @pyqtSignature("execute()")
55    def execute(self):
56        """ Never call directly, use `__call__` or `apply_async` instead
57        """
58        assert(self.thread() is QThread.currentThread())
59        self.emit(SIGNAL("starting()"))
60        try:
61            self.result  = self.func(*self._args, **self._kwargs)
62        except Exception, ex:
63            print >> sys.stderr, "Exception in thread ", QThread.currentThread(), " while calling ", self.func
64            self.emit(SIGNAL("finished(QString)"), QString(repr(ex)))
65            self.emit(SIGNAL("finished(PyQt_PyObject, QString)"), self, QString(repr(ex)))
66            self.emit(SIGNAL("unhandledException(PyQt_PyObject)"), sys.exc_info())
67
68            self._exc_info = sys.exc_info()
69            self._status = 1
70            return
71
72        self.emit(SIGNAL("finished(QString)"), QString("Ok"))
73        self.emit(SIGNAL("finished(PyQt_PyObject, QString)"), self, QString("Ok"))
74        self.emit(SIGNAL("resultReady(PyQt_PyObject)"), self.result)
75        self._status = 0
76
77
78    def __call__(self, *args, **kwargs):
79        """ Apply the call with args and kwargs additional arguments
80        """
81        if args or kwargs:
82            self.func = partial(self.func, *self._args, **self._kwargs)
83            self._args, self._kwargs = args, kwargs
84           
85        if not self._connected:
86            QTimer.singleShot(50, self.__call__) # delay until event loop initialized by RunnableAsyncCall
87            return
88        else:
89            self.emit(SIGNAL("_async_start()"))
90
91
92    def apply_async(self, func, args, kwargs):
93        """ call function with `args` as positional and `kwargs` as keyword
94        arguments (Overrides __init__ arguments).
95        """
96        self.func, self._args, self._kwargs = func, args, kwargs
97        self.__call__()
98
99
100    def poll(self):
101        """ Return the state of execution.
102        """
103        return getattr(self, "_status", None)
104   
105   
106    def join(self, processEvents=True):
107        """ Wait until the execution finishes.
108        """
109        while self.poll() is None:
110            QThread.currentThread().msleep(50)
111            if processEvents and QThread.currentThread() is qApp.thread():
112                qApp.processEvents()
113               
114    def get_result(self, processEvents=True):
115        """ Block until the computation completes and return the call result.
116        If the execution resulted in an exception, this method will re-raise
117        it.
118        """
119        self.join(processEvents=processEvents)
120        if self.poll() != 0:
121            # re-raise the error
122            raise self._exc_info[0], self._exc_info[1]
123        else:
124            return self.result
125   
126    def emitAdvance(self, count=1):
127        self.emit(SIGNAL("advance()"))
128        self.emit(SIGNAL("advance(int)"), count)
129       
130       
131    def emitProgressChanged(self, value):
132        self.emit(SIGNAL("progressChanged(float)"), value)
133       
134   
135    @pyqtSignature("moveToAndExecute(PyQt_PyObject)")
136    def moveToAndExecute(self, thread):
137        self.moveToThread(thread)
138       
139        self.connect(self, SIGNAL("_async_start()"), self.execute, Qt.QueuedConnection)
140       
141        self.emit(SIGNAL("_async_start()"))
142       
143       
144    @pyqtSignature("moveToAndInit(PyQt_PyObject)")
145    def moveToAndInit(self, thread):
146        self.moveToThread(thread)
147       
148        self.connect(self, SIGNAL("_async_start()"), self.execute, Qt.QueuedConnection)
149        self._connected = True
150       
151
152class WorkerThread(QThread):
153    """ A worker thread
154    """
155    def run(self):
156        self.exec_()
157       
158       
159class RunnableTask(QRunnable):
160    """ Wrapper for an AsyncCall
161    """
162    def __init__(self, call):
163        QRunnable.__init__(self)
164        self.setAutoDelete(False)
165        self._call = call
166       
167    def run(self):
168        if isinstance(self._call, AsyncCall):
169            self.eventLoop = QEventLoop()
170            self.eventLoop.processEvents()
171            QObject.connect(self._call, SIGNAL("finished(QString)"), lambda str: self.eventLoop.quit())
172            QMetaObject.invokeMethod(self._call, "moveToAndInit", Qt.QueuedConnection, Q_ARG("PyQt_PyObject", QThread.currentThread()))
173            self.eventLoop.processEvents()
174            self.eventLoop.exec_()
175        else:
176            self._return = self._call()
177           
178           
179class RunnableAsyncCall(RunnableTask):
180    def run(self):
181        self.eventLoop = QEventLoop()
182        self.eventLoop.processEvents()
183        QObject.connect(self._call, SIGNAL("finished(QString)"), lambda str: self.eventLoop.quit())
184        QMetaObject.invokeMethod(self._call, "moveToAndInit", Qt.QueuedConnection, Q_ARG("PyQt_PyObject", QThread.currentThread()))
185        self.eventLoop.processEvents()
186        self.eventLoop.exec_()
187
188def createTask(call, args=(), kwargs={}, onResult=None, onStarted=None, onFinished=None, onError=None, thread=None, threadPool=None):
189    async = AsyncCall(thread=thread, threadPool=threadPool)
190    if onResult is not None:
191        async.connect(async, SIGNAL("resultReady(PyQt_PyObject)"), onResult, Qt.QueuedConnection)
192    if onStarted is not None:
193        async.connect(async, SIGNAL("starting()"), onStarted, Qt.QueuedConnection)
194    if onFinished is not None:
195        async.connect(async, SIGNAL("finished(QString)"), onFinished, Qt.QueuedConnection)
196    if onError is not None:
197        async.connect(async, SIGNAL("unhandledException(PyQt_PyObject)"), onError, Qt.QueuedConnection)
198    async.apply_async(call, args, kwargs)
199    return async
200       
201from functools import partial
202       
203class ProgressBar(QObject):
204    """ A thread safe progress callback using Qt's signal mechanism
205    to deliver progress updates to the GUI thread. Make sure this object instance
206    is created in the GUI thread or is a child of an object from the GUI thread
207    """
208   
209    def __init__(self, widget, iterations, parent=None):
210        QObject.__init__(self, parent)
211        assert (qApp.thread() is self.thread())
212        self.iter = iterations
213        self.widget = widget
214        self.count = 0
215        self.widget.progressBarInit()
216       
217    def advance(self, count=1):
218        self.count += count
219        value = int(self.count*100/self.iter)
220        QMetaObject.invokeMethod(self, "queuedInvoke", Qt.QueuedConnection, Q_ARG("PyQt_PyObject", partial(self.widget.progressBarSet, value)))
221
222    def finish(self):
223        QMetaObject.invokeMethod(self, "queuedInvoke", Qt.QueuedConnection, Q_ARG("PyQt_PyObject", self.widget.progressBarFinished))
224       
225    def progressBarSet(self, value):
226        QMetaObject.invokeMethod(self, "queuedInvoke", Qt.QueuedConnection, Q_ARG("PyQt_PyObject", partial(self.widget.progressBarSet, value)))
227   
228    @pyqtSignature("queuedInvoke(PyQt_PyObject)")
229    def queuedInvoke(self, func):
230        func()
231       
232       
233class synchronized(object):
234    def __init__(self, object, mode=QMutex.Recursive):
235        if not hasattr(object, "_mutex"):
236            object._mutex = QMutex(mode)
237        self.mutex = object._mutex
238       
239    def __enter__(self):
240        self.mutex.lock()
241        return self
242   
243    def __exit__(self, exc_type=None, exc_value=None, tb=None):
244        self.mutex.unlock()
245
246_global_thread_pools = {}
247       
248       
249def threadPool(self, class_="global", private=False, maxCount=None):
250    with synchronized(threadPool):
251        if private:
252            pools = self._private_thread_pools
253        else:
254            pools = _global_thread_pools
255           
256        if class_ not in pools:
257            if class_ == "global":
258                instance = QThreadPool.globalInstance()
259            else:
260                instance = QThreadPool()
261                instance.setMaxThreadCount(maxCount)
262            pools[class_] = instance
263        return pools[class_]
264   
265OWBaseWidget.threadPool = threadPool
266       
267
268"""\
269A multiprocessing like API
270==========================
271"""
272
273class Process(AsyncCall):
274    _process_id = 0
275    def __init__(group=None, target=None, name=None, args=(), kwargs={}):
276        self.worker = WorkerThread()
277        AsyncCall.__init__(self, thread=self.worker)
278       
279        self.conenct(self, SIGANL("finished(QString)"), self.onFinished, Qt.QueuedConnection)
280        self.connect(self, SIGNAL("finished(QString)"), lambda:self.worker.quit(), Qt.QueuedConnection)
281        self.target = target
282        self.args = args
283        self.kwargs = kwargs
284        if name is None:
285            self.name = "Process-%i" % self._process_id
286            Process._process_id += 1
287        else:
288            self.name = name
289        self.exitcode = -1
290           
291    def start(self):
292        self.worker.start()
293        self.async_call(self.run)
294
295    def run(self):
296        self._result = self.target(*self.args, **self.kwargs)
297         
298    def join(self):
299        while self.poll() is None:
300            time.sleep(10)
301
302    def is_alive(self):
303        return self.poll() is None
304   
305    def onFinished(self, string):
306        self.exitcode = self._status
307       
308    def terminate(self):
309        self.worker.terminate()
310   
311from Queue import Queue
312
313class Pool(QObject):
314    def __init__(self, processes=None):
315        if processes is None:
316            import multiprocessing
317            processes = multiprocessing.cpu_count()
318        self.processes = processes
319        self.pool = [Process() for i in range(processes)]
320        self._i = 0
321    def get_process(self):
322        self._i = (self._i + 1) % len(self.pool)
323        return self.pool[self._i]
324     
325    def apply_async(func, args, kwargs):
326        process = self.get_process()
327        process.start()
328       
329    def start(self, ):
330        pass
331
Note: See TracBrowser for help on using the repository browser.