source: orange-bioinformatics/orangecontrib/bio/obiData.py @ 1873:0810c5708cc5

Revision 1873:0810c5708cc5, 9.8 KB checked in by Ales Erjavec <ales.erjavec@…>, 7 months ago (diff)

Moved '_bioinformatics' into orangecontrib namespace.

Line 
1from __future__ import absolute_import, with_statement
2
3import ftplib
4import urllib, urllib2
5import threading
6import os, sys
7import time
8import socket
9from Queue import Queue
10from StringIO import StringIO
11from datetime import datetime
12from threading import RLock
13
14from .obiGenomicsUpdate import synchronized
15
16class FileNotFoundError(IOError):
17    pass
18
19class SharedCache(object):
20    def __init__(self, *args, **kwargs):
21        self._dict = dict(*args, **kwargs)
22        self._lock = RLock()
23
24    def __getitem__(self, key):
25        with self._lock:
26            return self._dict.__getitem__(key)
27
28    def __setitem__(self, key, item):
29        with self._lock:
30            return self._dict.__setitem__(key, item)
31
32    def __delitem__(self, key):
33        with self._lock:
34            return self._dict.__delitem__(key)
35
36    def __contains__(self, key):
37        with self._lock:
38            return self._dict.__contains__(key)
39
40    def __getattr__(self, name):
41        try:
42            attr = getattr(self._dict, name)
43            if callable(attr):
44                return synchronized(self._lock)(attr)
45            else:
46                return attr
47        except AttributeError:
48            raise AttributeError(name)
49       
50class _ftpCallbackWrapper(object):
51    def __init__(self, size, callback, progressCallback):
52        self.size = max(size, 1)
53        self.callback = callback
54        self.progressCallback = progressCallback
55        self.transferCount = 0
56
57    def __call__(self, block):
58        self.callback(block)
59        self.progressCallback(min(int(100.0*self.transferCount/self.size), 100))
60        self.transferCount+=len(block)
61
62_monthDict = {"Jan":1, "Feb":2, "Mar":3, "Apr":4, "May":5, "Jun":6, "Jul":7, "Aug":8, "Sep":9, "Oct":10, "Nov":11, "Dec":12}
63
64class FtpWorker(object):
65    def __init__(self,  ftpAddr, statCache = None):
66        self.ftpAddr = ftpAddr
67        self.ftp = None
68        self.now = datetime.now()
69        self.statCache = statCache if statCache != None else SharedCache()
70
71    def connect(self):
72        if not self.ftp:
73            self.ftp = ftplib.FTP()
74        self.ftp.connect(self.ftpAddr)
75        self.ftp.sock.settimeout(30)
76        self.ftp.login()
77       
78    def retrieve(self, filename, local, update, progressCallback=None):
79        local = os.path.normpath(local)
80        isLocal = self.isLocal(local)
81        if not update and isLocal:
82            return
83        retryCount = 0
84        while retryCount<3:
85            if not self.ftp:
86                self.ftp = ftplib.FTP()
87                self.connect()
88            try:
89                retryCount+=1
90                size, date = self.statFtp(filename)
91                #if update and update!="force":
92                if isLocal:
93                    sizeLocal, dateLocal = self.statLocal(local)
94                    if sizeLocal!=size:
95                        update = "force"
96                    #print dateLocal, date, dateLocal < date
97                    if dateLocal < date: # TODO fix date comparison
98                        update = "force"
99                if update=="force" or not isLocal:
100                    s = StringIO()
101##                    f = open(local + ".tmp", "wb")
102                    if progressCallback:
103                        self.ftp.retrbinary("RETR "+filename, _ftpCallbackWrapper(size, s.write, progressCallback), )
104                    else:
105                        self.ftp.retrbinary("RETR "+filename, s.write)
106                    s.getvalue()
107                    if s.len>size:
108                        raise Exception("Wrong size of file "+filename)
109                    f = open(local, "wb")
110                    f.write(s.buf)
111                    f.flush()
112                    f.close()
113##                    try:
114##                        if os.path.exists(local):
115##                            os.remove(local)
116##                        os.rename(local + ".tmp", local)
117##                    except Exception, ex:
118##                        print ex, local
119##                        raise
120                    break
121            except ftplib.error_perm, ex:
122                if ex.args[0].startswith("550"):
123                    self.connect()
124                else:
125                    raise
126            except ftplib.error_temp, ex:
127                if retryCount >= 3:
128                    raise
129                else:
130                    time.sleep(3)
131            except socket.error:
132                if retryCount >= 3:
133                    raise
134                else:
135                    self.connect()
136            except FileNotFoundError:
137                raise
138   
139    def isLocal(self, filename):
140        try:
141            open(filename)
142            return True
143        except:
144            return False
145       
146    def statFtp(self, filename):
147        if not self.ftp:
148            self.connect()
149        dir, file = os.path.split(filename)
150        dir = dir + "/"
151        with self.statCache._lock:
152            if dir in self.statCache:
153                if file:
154                    try:
155                        s = self.statCache[dir][file].split()
156                    except KeyError:
157                        raise FileNotFoundError(filename)
158                else:
159                    return
160            else:
161                lines = []
162##                print "Ftp Stat:", dir, file
163                self.ftp.dir(dir, lines.append)
164                self.statCache[dir] = dict([(line.split()[-1].strip(), line.strip()) for line in lines if line.strip()])
165                if file:
166                    try:
167                        s = self.statCache[dir][file].split()
168                    except KeyError:
169                        raise FileNotFoundError(filename)
170                else:
171                    return
172##                print dir ,file, s
173    ##            s = s.getvalue().split()
174        size, date = int(s[-5]), s[-4:-1]
175        if ":" in date[-1]:
176            date = datetime(self.now.year, _monthDict.get(date[0], 1), int(date[1]))
177            if date>self.now:
178                date = datetime(date.year-1, date.month, date.day)
179        else:
180            date = datetime(int(date[-1]), _monthDict.get(date[0], 1), int(date[1]))
181        return size, date
182
183    def statLocal(self, filename):
184        stat = os.stat(filename)
185        return stat.st_size, datetime.fromtimestamp(stat.st_mtime)
186   
187    def listdir(self, ftp_dir):
188        """ List the contents of a remote ftp directory (similar to os.listdir)
189        """
190        if not self.ftp:
191            self.connect()
192        lines = []
193        self.ftp.dir(ftp_dir, lines.append)
194        self.statCache[dir] = dict([(line.split()[-1].strip(), line.strip()) for line in lines if line.strip()])
195        contents = [line.split()[-1] for line in lines]
196        return [name for name in contents if name not in [".", ".."]] 
197       
198
199class FtpThreadWorker(threading.Thread, FtpWorker):
200    def __init__(self, ftpAddr, queue, statCache=None, group=None, target=None, name=None, args=(), kwargs={}):
201        threading.Thread.__init__(self, group, target, name, args, kwargs)
202        FtpWorker.__init__(self, ftpAddr, statCache)
203        self.queue = queue
204
205    def run(self):
206        while True:
207            filename, local, update, retryCount, progressCallback = self.queue.get()
208            try:
209                self.retrieve(filename, local, update, progressCallback)
210            except Exception, ex:
211                sys.excepthook(*sys.exc_info())
212            self.queue.task_done()
213
214class FtpDownloader(object):
215    def __init__(self, ftpAddr, localDir, ftpDir="", numOfThreads=5):
216        self.ftpAddr = ftpAddr
217        self.localDir = localDir
218        self.ftpDir = ftpDir
219        self.numOfThreads = numOfThreads
220        self.queue = Queue(0)
221        self.workers = []
222        self.statCache = SharedCache()
223        self.ftpWorker = FtpWorker(self.ftpAddr, statCache=self.statCache)
224
225    def initWorkers(self):
226        if self.workers:
227            return
228        for i in range(self.numOfThreads):
229            try:
230                t = FtpThreadWorker(self.ftpAddr, self.queue, self.statCache, name="FtpThread(%s):#%i"%(self.ftpAddr, i))
231                t.setDaemon(True)
232                t.start()
233                self.workers.append(t)
234            except Exception, ex:
235                print ex
236                print "Cannot create thread num: "+str(i)
237               
238    def massRetrieve(self, filenames, update=False, blocking=True, progressCallback=None):
239        self.initWorkers()
240        for filename in filenames:
241            self.retrieve(filename, update, blocking=False)
242##            localDir = os.path.split(self.localDir+filename)[0]
243##            try:
244##                os.makedirs(localDir)
245##            except:
246##                pass
247##            self.queue.put((self.ftpDir+filename, self.localDir+filename, update, 0, None))
248        if blocking:
249            self.wait(progressCallback)
250           
251    def retrieve(self, filename, update=False, blocking=True, progressCallback=None):
252        if type(filename) == str:
253            filename = (filename, filename)
254        localDir = os.path.split(os.path.join(self.localDir, filename[1]))[0]
255        try:
256            os.makedirs(localDir)
257        except:
258            pass
259        if blocking:
260            self.ftpWorker.retrieve(self.ftpDir+filename[0], os.path.join(self.localDir, filename[1]), update, progressCallback)
261        else:
262            self.queue.put((self.ftpDir+filename[0], os.path.join(self.localDir, filename[1]), update, 0, progressCallback))
263
264    def wait(self, progressCallback=None):
265        count = self.queue.qsize()
266        while not self.queue.empty():
267            if progressCallback:
268                progressCallback(min(100.0, 100.0*(float(count)-self.queue.qsize())/count))
269            time.sleep(0.1)
270        self.queue.join()
271       
272    def listdir(self, ftp_dir):
273        """ List the contents of the remote ftp dir (similar to os.listdir)
274        """
275        return self.ftpWorker.listdir(self.ftpDir + ftp_dir)
276   
Note: See TracBrowser for help on using the repository browser.