source: orange-bioinformatics/Orange/bioinformatics/obiData.py @ 1625:cefeb35cbfc9

Revision 1625:cefeb35cbfc9, 9.8 KB checked in by mitar, 2 years ago (diff)

Moving files around.

Line 
1from __future__ import with_statement
2import ftplib
3import urllib, urllib2
4import threading
5import os, sys
6import time
7import socket
8from Queue import Queue
9from StringIO import StringIO
10from datetime import datetime
11from threading import RLock
12from obiGenomicsUpdate import synchronized
13
14class FileNotFoundError(IOError):
15    pass
16
17class SharedCache(object):
18    def __init__(self, *args, **kwargs):
19        self._dict = dict(*args, **kwargs)
20        self._lock = RLock()
21
22    def __getitem__(self, key):
23        with self._lock:
24            return self._dict.__getitem__(key)
25
26    def __setitem__(self, key, item):
27        with self._lock:
28            return self._dict.__setitem__(key, item)
29
30    def __delitem__(self, key):
31        with self._lock:
32            return self._dict.__delitem__(key)
33
34    def __contains__(self, key):
35        with self._lock:
36            return self._dict.__contains__(key)
37
38    def __getattr__(self, name):
39        try:
40            attr = getattr(self._dict, name)
41            if callable(attr):
42                return synchronized(self._lock)(attr)
43            else:
44                return attr
45        except AttributeError:
46            raise AttributeError(name)
47       
48class _ftpCallbackWrapper(object):
49    def __init__(self, size, callback, progressCallback):
50        self.size = max(size, 1)
51        self.callback = callback
52        self.progressCallback = progressCallback
53        self.transferCount = 0
54
55    def __call__(self, block):
56        self.callback(block)
57        self.progressCallback(min(int(100.0*self.transferCount/self.size), 100))
58        self.transferCount+=len(block)
59
60_monthDict = {"Jan":1, "Feb":2, "Mar":3, "Apr":4, "May":5, "Jun":6, "Jul":7, "Aug":8, "Sep":9, "Oct":10, "Nov":11, "Dec":12}
61
62class FtpWorker(object):
63    def __init__(self,  ftpAddr, statCache = None):
64        self.ftpAddr = ftpAddr
65        self.ftp = None
66        self.now = datetime.now()
67        self.statCache = statCache if statCache != None else SharedCache()
68
69    def connect(self):
70        if not self.ftp:
71            self.ftp = ftplib.FTP()
72        self.ftp.connect(self.ftpAddr)
73        self.ftp.sock.settimeout(30)
74        self.ftp.login()
75       
76    def retrieve(self, filename, local, update, progressCallback=None):
77        local = os.path.normpath(local)
78        isLocal = self.isLocal(local)
79        if not update and isLocal:
80            return
81        retryCount = 0
82        while retryCount<3:
83            if not self.ftp:
84                self.ftp = ftplib.FTP()
85                self.connect()
86            try:
87                retryCount+=1
88                size, date = self.statFtp(filename)
89                #if update and update!="force":
90                if isLocal:
91                    sizeLocal, dateLocal = self.statLocal(local)
92                    if sizeLocal!=size:
93                        update = "force"
94                    #print dateLocal, date, dateLocal < date
95                    if dateLocal < date: # TODO fix date comparison
96                        update = "force"
97                if update=="force" or not isLocal:
98                    s = StringIO()
99##                    f = open(local + ".tmp", "wb")
100                    if progressCallback:
101                        self.ftp.retrbinary("RETR "+filename, _ftpCallbackWrapper(size, s.write, progressCallback), )
102                    else:
103                        self.ftp.retrbinary("RETR "+filename, s.write)
104                    s.getvalue()
105                    if s.len>size:
106                        raise Exception("Wrong size of file "+filename)
107                    f = open(local, "wb")
108                    f.write(s.buf)
109                    f.flush()
110                    f.close()
111##                    try:
112##                        if os.path.exists(local):
113##                            os.remove(local)
114##                        os.rename(local + ".tmp", local)
115##                    except Exception, ex:
116##                        print ex, local
117##                        raise
118                    break
119            except ftplib.error_perm, ex:
120                if ex.args[0].startswith("550"):
121                    self.connect()
122                else:
123                    raise
124            except ftplib.error_temp, ex:
125                if retryCount >= 3:
126                    raise
127                else:
128                    time.sleep(3)
129            except socket.error:
130                if retryCount >= 3:
131                    raise
132                else:
133                    self.connect()
134            except FileNotFoundError:
135                raise
136   
137    def isLocal(self, filename):
138        try:
139            open(filename)
140            return True
141        except:
142            return False
143       
144    def statFtp(self, filename):
145        if not self.ftp:
146            self.connect()
147        dir, file = os.path.split(filename)
148        dir = dir + "/"
149        with self.statCache._lock:
150            if dir in self.statCache:
151                if file:
152                    try:
153                        s = self.statCache[dir][file].split()
154                    except KeyError:
155                        raise FileNotFoundError(filename)
156                else:
157                    return
158            else:
159                lines = []
160##                print "Ftp Stat:", dir, file
161                self.ftp.dir(dir, lines.append)
162                self.statCache[dir] = dict([(line.split()[-1].strip(), line.strip()) for line in lines if line.strip()])
163                if file:
164                    try:
165                        s = self.statCache[dir][file].split()
166                    except KeyError:
167                        raise FileNotFoundError(filename)
168                else:
169                    return
170##                print dir ,file, s
171    ##            s = s.getvalue().split()
172        size, date = int(s[-5]), s[-4:-1]
173        if ":" in date[-1]:
174            date = datetime(self.now.year, _monthDict.get(date[0], 1), int(date[1]))
175            if date>self.now:
176                date = datetime(date.year-1, date.month, date.day)
177        else:
178            date = datetime(int(date[-1]), _monthDict.get(date[0], 1), int(date[1]))
179        return size, date
180
181    def statLocal(self, filename):
182        stat = os.stat(filename)
183        return stat.st_size, datetime.fromtimestamp(stat.st_mtime)
184   
185    def listdir(self, ftp_dir):
186        """ List the contents of a remote ftp directory (similar to os.listdir)
187        """
188        if not self.ftp:
189            self.connect()
190        lines = []
191        self.ftp.dir(ftp_dir, lines.append)
192        self.statCache[dir] = dict([(line.split()[-1].strip(), line.strip()) for line in lines if line.strip()])
193        contents = [line.split()[-1] for line in lines]
194        return [name for name in contents if name not in [".", ".."]] 
195       
196
197class FtpThreadWorker(threading.Thread, FtpWorker):
198    def __init__(self, ftpAddr, queue, statCache=None, group=None, target=None, name=None, args=(), kwargs={}):
199        threading.Thread.__init__(self, group, target, name, args, kwargs)
200        FtpWorker.__init__(self, ftpAddr, statCache)
201        self.queue = queue
202
203    def run(self):
204        while True:
205            filename, local, update, retryCount, progressCallback = self.queue.get()
206            try:
207                self.retrieve(filename, local, update, progressCallback)
208            except Exception, ex:
209                sys.excepthook(*sys.exc_info())
210            self.queue.task_done()
211
212class FtpDownloader(object):
213    def __init__(self, ftpAddr, localDir, ftpDir="", numOfThreads=5):
214        self.ftpAddr = ftpAddr
215        self.localDir = localDir
216        self.ftpDir = ftpDir
217        self.numOfThreads = numOfThreads
218        self.queue = Queue(0)
219        self.workers = []
220        self.statCache = SharedCache()
221        self.ftpWorker = FtpWorker(self.ftpAddr, statCache=self.statCache)
222
223    def initWorkers(self):
224        if self.workers:
225            return
226        for i in range(self.numOfThreads):
227            try:
228                t = FtpThreadWorker(self.ftpAddr, self.queue, self.statCache, name="FtpThread(%s):#%i"%(self.ftpAddr, i))
229                t.setDaemon(True)
230                t.start()
231                self.workers.append(t)
232            except Exception, ex:
233                print ex
234                print "Cannot create thread num: "+str(i)
235               
236    def massRetrieve(self, filenames, update=False, blocking=True, progressCallback=None):
237        self.initWorkers()
238        for filename in filenames:
239            self.retrieve(filename, update, blocking=False)
240##            localDir = os.path.split(self.localDir+filename)[0]
241##            try:
242##                os.makedirs(localDir)
243##            except:
244##                pass
245##            self.queue.put((self.ftpDir+filename, self.localDir+filename, update, 0, None))
246        if blocking:
247            self.wait(progressCallback)
248           
249    def retrieve(self, filename, update=False, blocking=True, progressCallback=None):
250        if type(filename) == str:
251            filename = (filename, filename)
252        localDir = os.path.split(os.path.join(self.localDir, filename[1]))[0]
253        try:
254            os.makedirs(localDir)
255        except:
256            pass
257        if blocking:
258            self.ftpWorker.retrieve(self.ftpDir+filename[0], os.path.join(self.localDir, filename[1]), update, progressCallback)
259        else:
260            self.queue.put((self.ftpDir+filename[0], os.path.join(self.localDir, filename[1]), update, 0, progressCallback))
261
262    def wait(self, progressCallback=None):
263        count = self.queue.qsize()
264        while not self.queue.empty():
265            if progressCallback:
266                progressCallback(min(100.0, 100.0*(float(count)-self.queue.qsize())/count))
267            time.sleep(0.1)
268        self.queue.join()
269       
270    def listdir(self, ftp_dir):
271        """ List the contents of the remote ftp dir (similar to os.listdir)
272        """
273        return self.ftpWorker.listdir(self.ftpDir + ftp_dir)
274   
Note: See TracBrowser for help on using the repository browser.