source: orange-bioinformatics/obiData.py @ 1343:67c0a2030d8c

Revision 1343:67c0a2030d8c, 9.2 KB checked in by ales_erjavec <ales.erjavec@…>, 3 years ago (diff)

Improved ftplib.error_temp handling (waiting 3 seconds before retrying).
Display an error in the GEO Datasets widget after 3 retrys.

Line 
1from __future__ import with_statement
2import ftplib
3import urllib, urllib2
4import threading
5import os
6import time
7import socket
8from Queue import Queue
9from StringIO import StringIO
10from datetime import datetime
11from threading import RLock
12from obiGenomicsUpdate import synchronized
13
14class FileNotFoundError(IOError):
15    pass
16
17class SharedCache(object):
18    def __init__(self, *args, **kwargs):
19        self._dict = dict(*args, **kwargs)
20        self._lock = RLock()
21
22    def __getitem__(self, key):
23        with self._lock:
24            return self._dict.__getitem__(key)
25
26    def __setitem__(self, key, item):
27        with self._lock:
28            return self._dict.__setitem__(key, item)
29
30    def __delitem__(self, key):
31        with self._lock:
32            return self._dict.__delitem__(key)
33
34    def __contains__(self, key):
35        with self._lock:
36            return self._dict.__contains__(key)
37
38    def __getattr__(self, name):
39        try:
40            attr = getattr(self._dict, name)
41            if callable(attr):
42                return synchronized(self._lock)(attr)
43            else:
44                return attr
45        except AttributeError:
46            raise AttributeError(name)
47       
48class _ftpCallbackWrapper(object):
49    def __init__(self, size, callback, progressCallback):
50        self.size = max(size, 1)
51        self.callback = callback
52        self.progressCallback = progressCallback
53        self.transferCount = 0
54
55    def __call__(self, block):
56        self.callback(block)
57        self.progressCallback(min(int(100.0*self.transferCount/self.size), 100))
58        self.transferCount+=len(block)
59
60_monthDict = {"Jan":1, "Feb":2, "Mar":3, "Apr":4, "May":5, "Jun":6, "Jul":7, "Aug":8, "Sep":9, "Oct":10, "Nov":11, "Dec":12}
61
62class FtpWorker(object):
63    def __init__(self,  ftpAddr, statCache = None):
64        self.ftpAddr = ftpAddr
65        self.ftp = None
66        self.now = datetime.now()
67        self.statCache = statCache if statCache != None else SharedCache()
68
69    def connect(self):
70        if not self.ftp:
71            self.ftp = ftplib.FTP()
72        self.ftp.connect(self.ftpAddr)
73        self.ftp.sock.settimeout(30)
74        self.ftp.login()
75       
76    def retrieve(self, filename, local, update, progressCallback=None):
77        local = os.path.normpath(local)
78        isLocal = self.isLocal(local)
79        if not update and isLocal:
80            return
81        retryCount = 0
82        while retryCount<3:
83            if not self.ftp:
84                self.ftp = ftplib.FTP()
85                self.connect()
86            try:
87                retryCount+=1
88                size, date = self.statFtp(filename)
89                #if update and update!="force":
90                if isLocal:
91                    sizeLocal, dateLocal = self.statLocal(local)
92                    if sizeLocal!=size:
93                        update = "force"
94                    #print dateLocal, date, dateLocal < date
95                    if dateLocal < date: # TODO fix date comparison
96                        update = "force"
97                if update=="force" or not isLocal:
98                    s = StringIO()
99##                    f = open(local + ".tmp", "wb")
100                    if progressCallback:
101                        self.ftp.retrbinary("RETR "+filename, _ftpCallbackWrapper(size, s.write, progressCallback), )
102                    else:
103                        self.ftp.retrbinary("RETR "+filename, s.write)
104                    s.getvalue()
105                    if s.len>size:
106                        raise Exception("Wrong size of file "+filename)
107                    f = open(local, "wb")
108                    f.write(s.buf)
109                    f.flush()
110                    f.close()
111##                    try:
112##                        if os.path.exists(local):
113##                            os.remove(local)
114##                        os.rename(local + ".tmp", local)
115##                    except Exception, ex:
116##                        print ex, local
117##                        raise
118                    break
119            except ftplib.error_perm, ex:
120                if ex.args[0].startswith("550"):
121                    self.connect()
122                else:
123                    raise
124            except ftplib.error_temp, ex:
125                if retryCount >= 3:
126                    raise
127                else:
128                    time.sleep(3)
129            except socket.error:
130                if retryCount >= 3:
131                    raise
132                else:
133                    self.connect()
134            except FileNotFoundError:
135                raise
136   
137    def isLocal(self, filename):
138        try:
139            open(filename)
140            return True
141        except:
142            return False
143       
144    def statFtp(self, filename):
145        if not self.ftp:
146            self.connect()
147        dir, file = os.path.split(filename)
148        dir = dir + "/"
149        with self.statCache._lock:
150            if dir in self.statCache:
151                if file:
152                    try:
153                        s = self.statCache[dir][file].split()
154                    except KeyError:
155                        raise FileNotFoundError(filename)
156                else:
157                    return
158            else:
159                lines = []
160##                print "Ftp Stat:", dir, file
161                self.ftp.dir(dir, lines.append)
162                self.statCache[dir] = dict([(line.split()[-1].strip(), line.strip()) for line in lines if line.strip()])
163                if file:
164                    try:
165                        s = self.statCache[dir][file].split()
166                    except KeyError:
167                        raise FileNotFoundError(filename)
168                else:
169                    return
170##                print dir ,file, s
171    ##            s = s.getvalue().split()
172        size, date = int(s[-5]), s[-4:-1]
173        if ":" in date[-1]:
174            date = datetime(self.now.year, _monthDict.get(date[0], 1), int(date[1]))
175            if date>self.now:
176                date = datetime(date.year-1, date.month, date.day)
177        else:
178            date = datetime(int(date[-1]), _monthDict.get(date[0], 1), int(date[1]))
179        return size, date
180
181    def statLocal(self, filename):
182        stat = os.stat(filename)
183        return stat.st_size, datetime.fromtimestamp(stat.st_mtime)
184
185class FtpThreadWorker(threading.Thread, FtpWorker):
186    def __init__(self, ftpAddr, queue, statCache=None, group=None, target=None, name=None, args=(), kwargs={}):
187        threading.Thread.__init__(self, group, target, name, args, kwargs)
188        FtpWorker.__init__(self, ftpAddr, statCache)
189        self.queue = queue
190
191    def run(self):
192        while True:
193            filename, local, update, retryCount, progressCallback = self.queue.get()
194            self.retrieve(filename, local, update, progressCallback)
195            self.queue.task_done()
196
197class FtpDownloader(object):
198    def __init__(self, ftpAddr, localDir, ftpDir="", numOfThreads=5):
199        self.ftpAddr = ftpAddr
200        self.localDir = localDir
201        self.ftpDir = ftpDir
202        self.numOfThreads = numOfThreads
203        self.queue = Queue(0)
204        self.workers = []
205        self.statCache = SharedCache()
206        self.ftpWorker = FtpWorker(self.ftpAddr, statCache=self.statCache)
207
208    def initWorkers(self):
209        if self.workers:
210            return
211        for i in range(self.numOfThreads):
212            try:
213                t = FtpThreadWorker(self.ftpAddr, self.queue, self.statCache, name="FtpThread(%s):#%i"%(self.ftpAddr, i))
214                t.setDaemon(True)
215                t.start()
216                self.workers.append(t)
217            except Exception, ex:
218                print ex
219                print "Cannot create thread num: "+str(i)
220               
221    def massRetrieve(self, filenames, update=False, blocking=True, progressCallback=None):
222        self.initWorkers()
223        for filename in filenames:
224            self.retrieve(filename, update, blocking=False)
225##            localDir = os.path.split(self.localDir+filename)[0]
226##            try:
227##                os.makedirs(localDir)
228##            except:
229##                pass
230##            self.queue.put((self.ftpDir+filename, self.localDir+filename, update, 0, None))
231        if blocking:
232            self.wait(progressCallback)
233           
234    def retrieve(self, filename, update=False, blocking=True, progressCallback=None):
235        if type(filename) == str:
236            filename = (filename, filename)
237        localDir = os.path.split(os.path.join(self.localDir, filename[1]))[0]
238        try:
239            os.makedirs(localDir)
240        except:
241            pass
242        if blocking:
243            self.ftpWorker.retrieve(self.ftpDir+filename[0], os.path.join(self.localDir, filename[1]), update, progressCallback)
244        else:
245            self.queue.put((self.ftpDir+filename[0], os.path.join(self.localDir, filename[1]), update, 0, progressCallback))
246
247    def wait(self, progressCallback=None):
248        count = self.queue.qsize()
249        while not self.queue.empty():
250            if progressCallback:
251                progressCallback(min(100.0, 100.0*(float(count)-self.queue.qsize())/count))
252            time.sleep(0.1)
253        self.queue.join()
254
255##class HTTPDownloader(DownloaderBase):
256##    def __init__(self, *args, **kwargs):
257##        DownloaderBase.__init__(self, *args, **kwargs)
258##
259##    def retrieve(self, filename):
260##       
Note: See TracBrowser for help on using the repository browser.