source: orange-bioinformatics/obiData.py @ 550:6c1db8e91f12

Revision 550:6c1db8e91f12, 9.1 KB checked in by ales_erjavec <ales.erjavec@…>, 6 years ago (diff)
Line 
1from __future__ import with_statement
2import ftplib
3import urllib, urllib2
4import threading
5import os
6import time
7import socket
8from Queue import Queue
9from StringIO import StringIO
10from datetime import datetime
11from threading import RLock
12from obiGenomicsUpdate import synchronized
13
14class FileNotFoundError(IOError):
15    pass
16
17class SharedCache(object):
18    def __init__(self, *args, **kwargs):
19        self._dict = dict(*args, **kwargs)
20        self._lock = RLock()
21
22    def __getitem__(self, key):
23        with self._lock:
24            return self._dict.__getitem__(key)
25
26    def __setitem__(self, key, item):
27        with self._lock:
28            return self._dict.__setitem__(key, item)
29
30    def __delitem__(self, key):
31        with self._lock:
32            return self._dict.__delitem__(key)
33
34    def __contains__(self, key):
35        with self._lock:
36            return self._dict.__contains__(key)
37
38    def __getattr__(self, name):
39        try:
40            attr = getattr(self._dict, name)
41            if callable(attr):
42                return synchronized(self._lock)(attr)
43            else:
44                return attr
45        except AttributeError:
46            raise AttributeError(name)
47       
48class _ftpCallbackWrapper(object):
49    def __init__(self, size, callback, progressCallback):
50        self.size = max(size, 1)
51        self.callback = callback
52        self.progressCallback = progressCallback
53        self.transferCount = 0
54
55    def __call__(self, block):
56        self.callback(block)
57        self.progressCallback(min(int(100.0*self.transferCount/self.size), 100))
58        self.transferCount+=len(block)
59
60_monthDict = {"Jan":1, "Feb":2, "Mar":3, "Apr":4, "May":5, "Jun":6, "Jul":7, "Aug":8, "Sep":9, "Oct":10, "Nov":11, "Dec":12}
61
62class FtpWorker(object):
63    def __init__(self,  ftpAddr, statCache = None):
64        self.ftpAddr = ftpAddr
65        self.ftp = None
66        self.now = datetime.now()
67        self.statCache = statCache if statCache != None else SharedCache()
68
69    def connect(self):
70        if not self.ftp:
71            self.ftp = ftplib.FTP()
72        self.ftp.connect(self.ftpAddr)
73        self.ftp.sock.settimeout(30)
74        self.ftp.login()
75       
76    def retrieve(self, filename, local, update, progressCallback=None):
77        local = os.path.normpath(local)
78        isLocal = self.isLocal(local)
79        if not update and isLocal:
80            return
81        retryCount = 0
82        while retryCount<3:
83            if not self.ftp:
84                self.ftp = ftplib.FTP()
85                self.connect()
86            try:
87                retryCount+=1
88                size, date = self.statFtp(filename)
89                #if update and update!="force":
90                if isLocal:
91                    sizeLocal, dateLocal = self.statLocal(local)
92                    if sizeLocal!=size:
93                        update = "force"
94                    #print dateLocal, date, dateLocal < date
95                    if dateLocal < date: # TODO fix date comparison
96                        update = "force"
97                if update=="force" or not isLocal:
98                    s = StringIO()
99##                    f = open(local + ".tmp", "wb")
100                    if progressCallback:
101                        self.ftp.retrbinary("RETR "+filename, _ftpCallbackWrapper(size, s.write, progressCallback), )
102                    else:
103                        self.ftp.retrbinary("RETR "+filename, s.write)
104                    s.getvalue()
105                    if s.len>size:
106                        raise Exception("Wrong size of file "+filename)
107                    f = open(local, "wb")
108                    f.write(s.buf)
109                    f.flush()
110                    f.close()
111##                    try:
112##                        if os.path.exists(local):
113##                            os.remove(local)
114##                        os.rename(local + ".tmp", local)
115##                    except Exception, ex:
116##                        print ex, local
117##                        raise
118                    break
119            except ftplib.error_perm, ex:
120                #print ex
121                if ex.args[0].startswith("550"):
122                    self.connect()
123                else:
124                    break
125            except ftplib.error_temp, ex:
126                #print ex
127                break
128            except socket.error:
129                self.connect()
130            except FileNotFoundError:
131                break
132   
133    def isLocal(self, filename):
134        try:
135            open(filename)
136            return True
137        except:
138            return False
139       
140    def statFtp(self, filename):
141        if not self.ftp:
142            self.connect()
143        dir, file = os.path.split(filename)
144        dir = dir + "/"
145        with self.statCache._lock:
146            if dir in self.statCache:
147                if file:
148                    try:
149                        s = self.statCache[dir][file].split()
150                    except KeyError:
151                        raise FileNotFoundError(filename)
152                else:
153                    return
154            else:
155                lines = []
156##                print "Ftp Stat:", dir, file
157                self.ftp.dir(dir, lines.append)
158                self.statCache[dir] = dict([(line.split()[-1].strip(), line.strip()) for line in lines if line.strip()])
159                if file:
160                    try:
161                        s = self.statCache[dir][file].split()
162                    except KeyError:
163                        raise FileNotFoundError(filename)
164                else:
165                    return
166##                print dir ,file, s
167    ##            s = s.getvalue().split()
168        size, date = int(s[-5]), s[-4:-1]
169        if ":" in date[-1]:
170            date = datetime(self.now.year, _monthDict.get(date[0], 1), int(date[1]))
171            if date>self.now:
172                date = datetime(date.year-1, date.month, date.day)
173        else:
174            date = datetime(int(date[-1]), _monthDict.get(date[0], 1), int(date[1]))
175        return size, date
176
177    def statLocal(self, filename):
178        stat = os.stat(filename)
179        return stat.st_size, datetime.fromtimestamp(stat.st_mtime)
180
181class FtpThreadWorker(threading.Thread, FtpWorker):
182    def __init__(self, ftpAddr, queue, statCache=None, group=None, target=None, name=None, args=(), kwargs={}):
183        threading.Thread.__init__(self, group, target, name, args, kwargs)
184        FtpWorker.__init__(self, ftpAddr, statCache)
185        self.queue = queue
186
187    def run(self):
188        while True:
189            filename, local, update, retryCount, progressCallback = self.queue.get()
190            self.retrieve(filename, local, update, progressCallback)
191            self.queue.task_done()
192
193class FtpDownloader(object):
194    def __init__(self, ftpAddr, localDir, ftpDir="", numOfThreads=5):
195        self.ftpAddr = ftpAddr
196        self.localDir = localDir
197        self.ftpDir = ftpDir
198        self.numOfThreads = numOfThreads
199        self.queue = Queue(0)
200        self.workers = []
201        self.statCache = SharedCache()
202        self.ftpWorker = FtpWorker(self.ftpAddr, statCache=self.statCache)
203
204    def initWorkers(self):
205        if self.workers:
206            return
207        for i in range(self.numOfThreads):
208            try:
209                t = FtpThreadWorker(self.ftpAddr, self.queue, self.statCache, name="FtpThread(%s):#%i"%(self.ftpAddr, i))
210                t.setDaemon(True)
211                t.start()
212                self.workers.append(t)
213            except Exception, ex:
214                print ex
215                print "Cannot create thread num: "+str(i)
216               
217    def massRetrieve(self, filenames, update=False, blocking=True, progressCallback=None):
218        self.initWorkers()
219        for filename in filenames:
220            self.retrieve(filename, update, blocking=False)
221##            localDir = os.path.split(self.localDir+filename)[0]
222##            try:
223##                os.makedirs(localDir)
224##            except:
225##                pass
226##            self.queue.put((self.ftpDir+filename, self.localDir+filename, update, 0, None))
227        if blocking:
228            self.wait(progressCallback)
229           
230    def retrieve(self, filename, update=False, blocking=True, progressCallback=None):
231        if type(filename) == str:
232            filename = (filename, filename)
233        localDir = os.path.split(os.path.join(self.localDir, filename[1]))[0]
234        try:
235            os.makedirs(localDir)
236        except:
237            pass
238        if blocking:
239            self.ftpWorker.retrieve(self.ftpDir+filename[0], os.path.join(self.localDir, filename[1]), update, progressCallback)
240        else:
241            self.queue.put((self.ftpDir+filename[0], os.path.join(self.localDir, filename[1]), update, 0, progressCallback))
242
243    def wait(self, progressCallback=None):
244        count = self.queue.qsize()
245        while not self.queue.empty():
246            if progressCallback:
247                progressCallback(min(100.0, 100.0*(float(count)-self.queue.qsize())/count))
248            time.sleep(0.1)
249        self.queue.join()
250
251##class HTTPDownloader(DownloaderBase):
252##    def __init__(self, *args, **kwargs):
253##        DownloaderBase.__init__(self, *args, **kwargs)
254##
255##    def retrieve(self, filename):
256##       
Note: See TracBrowser for help on using the repository browser.