source: orange-bioinformatics/obiData.py @ 1339:dad0ff1a2018

Revision 1339:dad0ff1a2018, 9.1 KB checked in by ales_erjavec <ales.erjavec@…>, 3 years ago (diff)

Re-raise the FTP error exception on the last retry.

Line 
1from __future__ import with_statement
2import ftplib
3import urllib, urllib2
4import threading
5import os
6import time
7import socket
8from Queue import Queue
9from StringIO import StringIO
10from datetime import datetime
11from threading import RLock
12from obiGenomicsUpdate import synchronized
13
14class FileNotFoundError(IOError):
15    pass
16
17class SharedCache(object):
18    def __init__(self, *args, **kwargs):
19        self._dict = dict(*args, **kwargs)
20        self._lock = RLock()
21
22    def __getitem__(self, key):
23        with self._lock:
24            return self._dict.__getitem__(key)
25
26    def __setitem__(self, key, item):
27        with self._lock:
28            return self._dict.__setitem__(key, item)
29
30    def __delitem__(self, key):
31        with self._lock:
32            return self._dict.__delitem__(key)
33
34    def __contains__(self, key):
35        with self._lock:
36            return self._dict.__contains__(key)
37
38    def __getattr__(self, name):
39        try:
40            attr = getattr(self._dict, name)
41            if callable(attr):
42                return synchronized(self._lock)(attr)
43            else:
44                return attr
45        except AttributeError:
46            raise AttributeError(name)
47       
48class _ftpCallbackWrapper(object):
49    def __init__(self, size, callback, progressCallback):
50        self.size = max(size, 1)
51        self.callback = callback
52        self.progressCallback = progressCallback
53        self.transferCount = 0
54
55    def __call__(self, block):
56        self.callback(block)
57        self.progressCallback(min(int(100.0*self.transferCount/self.size), 100))
58        self.transferCount+=len(block)
59
60_monthDict = {"Jan":1, "Feb":2, "Mar":3, "Apr":4, "May":5, "Jun":6, "Jul":7, "Aug":8, "Sep":9, "Oct":10, "Nov":11, "Dec":12}
61
62class FtpWorker(object):
63    def __init__(self,  ftpAddr, statCache = None):
64        self.ftpAddr = ftpAddr
65        self.ftp = None
66        self.now = datetime.now()
67        self.statCache = statCache if statCache != None else SharedCache()
68
69    def connect(self):
70        if not self.ftp:
71            self.ftp = ftplib.FTP()
72        self.ftp.connect(self.ftpAddr)
73        self.ftp.sock.settimeout(30)
74        self.ftp.login()
75       
76    def retrieve(self, filename, local, update, progressCallback=None):
77        local = os.path.normpath(local)
78        isLocal = self.isLocal(local)
79        if not update and isLocal:
80            return
81        retryCount = 0
82        while retryCount<3:
83            if not self.ftp:
84                self.ftp = ftplib.FTP()
85                self.connect()
86            try:
87                retryCount+=1
88                size, date = self.statFtp(filename)
89                #if update and update!="force":
90                if isLocal:
91                    sizeLocal, dateLocal = self.statLocal(local)
92                    if sizeLocal!=size:
93                        update = "force"
94                    #print dateLocal, date, dateLocal < date
95                    if dateLocal < date: # TODO fix date comparison
96                        update = "force"
97                if update=="force" or not isLocal:
98                    s = StringIO()
99##                    f = open(local + ".tmp", "wb")
100                    if progressCallback:
101                        self.ftp.retrbinary("RETR "+filename, _ftpCallbackWrapper(size, s.write, progressCallback), )
102                    else:
103                        self.ftp.retrbinary("RETR "+filename, s.write)
104                    s.getvalue()
105                    if s.len>size:
106                        raise Exception("Wrong size of file "+filename)
107                    f = open(local, "wb")
108                    f.write(s.buf)
109                    f.flush()
110                    f.close()
111##                    try:
112##                        if os.path.exists(local):
113##                            os.remove(local)
114##                        os.rename(local + ".tmp", local)
115##                    except Exception, ex:
116##                        print ex, local
117##                        raise
118                    break
119            except ftplib.error_perm, ex:
120                if ex.args[0].startswith("550"):
121                    self.connect()
122                else:
123                    raise
124            except ftplib.error_temp, ex:
125                if retryCount >= 3:
126                    raise
127            except socket.error:
128                if retryCount >= 3:
129                    raise
130                else:
131                    self.connect()
132            except FileNotFoundError:
133                raise
134   
135    def isLocal(self, filename):
136        try:
137            open(filename)
138            return True
139        except:
140            return False
141       
142    def statFtp(self, filename):
143        if not self.ftp:
144            self.connect()
145        dir, file = os.path.split(filename)
146        dir = dir + "/"
147        with self.statCache._lock:
148            if dir in self.statCache:
149                if file:
150                    try:
151                        s = self.statCache[dir][file].split()
152                    except KeyError:
153                        raise FileNotFoundError(filename)
154                else:
155                    return
156            else:
157                lines = []
158##                print "Ftp Stat:", dir, file
159                self.ftp.dir(dir, lines.append)
160                self.statCache[dir] = dict([(line.split()[-1].strip(), line.strip()) for line in lines if line.strip()])
161                if file:
162                    try:
163                        s = self.statCache[dir][file].split()
164                    except KeyError:
165                        raise FileNotFoundError(filename)
166                else:
167                    return
168##                print dir ,file, s
169    ##            s = s.getvalue().split()
170        size, date = int(s[-5]), s[-4:-1]
171        if ":" in date[-1]:
172            date = datetime(self.now.year, _monthDict.get(date[0], 1), int(date[1]))
173            if date>self.now:
174                date = datetime(date.year-1, date.month, date.day)
175        else:
176            date = datetime(int(date[-1]), _monthDict.get(date[0], 1), int(date[1]))
177        return size, date
178
179    def statLocal(self, filename):
180        stat = os.stat(filename)
181        return stat.st_size, datetime.fromtimestamp(stat.st_mtime)
182
183class FtpThreadWorker(threading.Thread, FtpWorker):
184    def __init__(self, ftpAddr, queue, statCache=None, group=None, target=None, name=None, args=(), kwargs={}):
185        threading.Thread.__init__(self, group, target, name, args, kwargs)
186        FtpWorker.__init__(self, ftpAddr, statCache)
187        self.queue = queue
188
189    def run(self):
190        while True:
191            filename, local, update, retryCount, progressCallback = self.queue.get()
192            self.retrieve(filename, local, update, progressCallback)
193            self.queue.task_done()
194
195class FtpDownloader(object):
196    def __init__(self, ftpAddr, localDir, ftpDir="", numOfThreads=5):
197        self.ftpAddr = ftpAddr
198        self.localDir = localDir
199        self.ftpDir = ftpDir
200        self.numOfThreads = numOfThreads
201        self.queue = Queue(0)
202        self.workers = []
203        self.statCache = SharedCache()
204        self.ftpWorker = FtpWorker(self.ftpAddr, statCache=self.statCache)
205
206    def initWorkers(self):
207        if self.workers:
208            return
209        for i in range(self.numOfThreads):
210            try:
211                t = FtpThreadWorker(self.ftpAddr, self.queue, self.statCache, name="FtpThread(%s):#%i"%(self.ftpAddr, i))
212                t.setDaemon(True)
213                t.start()
214                self.workers.append(t)
215            except Exception, ex:
216                print ex
217                print "Cannot create thread num: "+str(i)
218               
219    def massRetrieve(self, filenames, update=False, blocking=True, progressCallback=None):
220        self.initWorkers()
221        for filename in filenames:
222            self.retrieve(filename, update, blocking=False)
223##            localDir = os.path.split(self.localDir+filename)[0]
224##            try:
225##                os.makedirs(localDir)
226##            except:
227##                pass
228##            self.queue.put((self.ftpDir+filename, self.localDir+filename, update, 0, None))
229        if blocking:
230            self.wait(progressCallback)
231           
232    def retrieve(self, filename, update=False, blocking=True, progressCallback=None):
233        if type(filename) == str:
234            filename = (filename, filename)
235        localDir = os.path.split(os.path.join(self.localDir, filename[1]))[0]
236        try:
237            os.makedirs(localDir)
238        except:
239            pass
240        if blocking:
241            self.ftpWorker.retrieve(self.ftpDir+filename[0], os.path.join(self.localDir, filename[1]), update, progressCallback)
242        else:
243            self.queue.put((self.ftpDir+filename[0], os.path.join(self.localDir, filename[1]), update, 0, progressCallback))
244
245    def wait(self, progressCallback=None):
246        count = self.queue.qsize()
247        while not self.queue.empty():
248            if progressCallback:
249                progressCallback(min(100.0, 100.0*(float(count)-self.queue.qsize())/count))
250            time.sleep(0.1)
251        self.queue.join()
252
253##class HTTPDownloader(DownloaderBase):
254##    def __init__(self, *args, **kwargs):
255##        DownloaderBase.__init__(self, *args, **kwargs)
256##
257##    def retrieve(self, filename):
258##       
Note: See TracBrowser for help on using the repository browser.