Changeset 11777:77ff33109990 in orange


Ignore:
Timestamp:
11/29/13 18:34:29 (5 months ago)
Author:
Ales Erjavec <ales.erjavec@…>
Branch:
default
Message:

Added locking for thread safe downloads.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • Orange/utils/serverfiles.py

    r11653 r11777  
    120120import urllib2 
    121121import base64 
     122import functools 
     123 
     124from contextlib import contextmanager 
    122125 
    123126from Orange.utils import ConsoleProgressBar 
     
    435438        return data 
    436439 
    437 def download(domain, filename, serverfiles=None, callback=None,  
    438     extract=True, verbose=True): 
     440 
     441def _keyed_lock(lock_constructor=threading.Lock): 
     442    lock = threading.Lock() 
     443    locks = {} 
     444 
     445    def get_lock(key): 
     446        with lock: 
     447            if key not in locks: 
     448                locks[key] = lock_constructor() 
     449 
     450            return locks[key] 
     451    return get_lock 
     452 
     453 
     454class _Lock(object): 
     455    """ 
     456    Like :class:`threading.Lock` but raises an error on reentrant acquire. 
     457    """ 
     458    def __init__(self): 
     459        self.__lock = threading.RLock() 
     460        self.__locking_thread = None 
     461 
     462    def acquire(self, blocking=True): 
     463        if self.__lock.acquire(blocking): 
     464            if self.__locking_thread is None: 
     465                self.__locking_thread = threading.current_thread() 
     466            else: 
     467                assert self.__locking_thread == threading.current_thread() 
     468                self.__lock.release() 
     469                raise Exception("Recursive lock acquire!") 
     470 
     471            return True 
     472        else: 
     473            return False 
     474 
     475    def release(self): 
     476        assert self.__locking_thread is not None 
     477        self.__locking_thread = None 
     478        self.__lock.release() 
     479 
     480    def locked(self): 
     481        return self.__lock.locked() 
     482 
     483    def __enter__(self): 
     484        self.acquire() 
     485 
     486    def __exit__(self, *arg): 
     487        self.release() 
     488 
     489# _get_lock = _keyed_lock(threading.RLock) 
     490_get_lock = _keyed_lock(_Lock) 
     491 
     492 
     493@contextmanager 
     494def _lock_file(domain, filename, blocking=False): 
     495    path = localpath(domain, filename) 
     496    path = os.path.normpath(os.path.realpath(path)) 
     497#     log.debug("locking: %s", path) 
     498    lock = _get_lock(path) 
     499    if lock.acquire(blocking): 
     500#         log.debug("got lock on: %s", path) 
     501        try: 
     502            yield 
     503        finally: 
     504            lock.release() 
     505#             log.debug("Released lock on: %s",  path) 
     506    else: 
     507        raise Exception("Could not acquire lock") 
     508 
     509 
     510def _locked(f): 
     511    @functools.wraps(f) 
     512    def func(domain, filename, *args, **kwargs): 
     513        with _lock_file(domain, filename, blocking=True): 
     514            return f(domain, filename, *args, **kwargs) 
     515    func.unwraped = f 
     516    return func 
     517 
     518 
     519@_locked 
     520def download(domain, filename, serverfiles=None, callback=None, 
     521             extract=True, verbose=True): 
    439522    """Downloads file from the repository to local orange installation. 
    440523    To download files as an authenticated user you should also pass an 
     
    479562        callback.finish() 
    480563 
     564 
     565@_locked 
    481566def localpath_download(domain, filename, **kwargs): 
    482567    """  
     
    486571    pathname = localpath(domain, filename) 
    487572    if not os.path.exists(pathname): 
    488         download(domain, filename, **kwargs) 
     573        download.unwraped(domain, filename, **kwargs) 
    489574    return pathname 
    490575 
     
    510595    return okfiles 
    511596 
     597 
     598@_locked 
    512599def remove(domain, filename): 
    513600    """Remove a file from local repository.""" 
Note: See TracChangeset for help on using the changeset viewer.