source: orange/orange/orngServerFiles.py @ 7115:080d8acdae18

Revision 7115:080d8acdae18, 23.5 KB checked in by ales_erjavec <ales.erjavec@…>, 3 years ago (diff)
  • fixed a problem with DownloadProgress redirect context manager running python 2.7
Line 
1"""
2orngServerFiles is a module which enables users to simply access files in
3a repository.
4
5Each file is specified with by two parameters: domain and filename. A domain
6is something like a filesystem directory -- a container holding files.
7
8Domain should consist of less than 255 alphanumeric ASCII characters, whereas
9filenames can be arbitary long and can contain any ASCII character (including
10"" ~ . \ / { }). Please, refrain from using not-ASCII character both in
11domain and filenames. Files can be protected or not. Protected file names can
12only be accessed by authenticated users!
13
14orngServerFiles can be used by creating a ServerFiles object. Username
15and password need to be passed by object initialization. All password
16protected operations and transfers are secured by SSL: this secures both
17password and content. Creating SSL connection on Windows platforms are
18not tested yes. Maybe they will require additional modules (Ales, please
19report).
20
21An un-authenticated user can list files in a domain ("list"), download
22individual file ("download") or get individual file information ("info") --
23bytesize and datetime. Datetimes can be compared as strings.
24
25An authenticated user can create and remove domains ("create_domain", "remove
26domain"), upload files ("upload"), delete them ("remove") and manage their
27protection ("protect", "unprotect"). Note, that newly uploaded files,
28which name did not exists in domain before, are protected. They can be made 
29public by unprotecting them.
30
31SERVER
32
33Files are stored in a filesystem. Each domain is a filesystem directory in
34which files are stored. Each saved file also has a corresponding information
35file.
36
37Current performace limitation: only one concurrent upload. This can be overcome
38with smarter locking.
39
40Todo: checksum after transfer.
41"""
42
43import sys
44import socket
45
46# timeout in seconds
47timeout = 120
48socket.setdefaulttimeout(timeout)
49
50import urllib2
51import base64
52
53import urllib2_file #switch to poster in the future
54#import poster.streaminghttp as psh
55#import poster.encode
56
57import os
58import shutil
59import glob
60import datetime
61import tempfile
62
63#defserver = "localhost:9999/"
64defserver = "asterix.fri.uni-lj.si/orngServerFiles/"
65
66def _parseFileInfo(fir, separ="|||||"):
67    """
68    Parses file info from server.
69    """
70    l= fir.split(separ)
71    fi = {}
72    fi["size"] = l[0]
73    fi["datetime"] = l[1]
74    fi["title"] = l[2]
75    fi["tags"] = l[3].split(";")
76    return fi
77
78def openFileInfo(fname):
79    f = open(fname, 'rt')
80    info = _parseFileInfo(f.read(), separ='\n')
81    f.close()
82    return info
83
84def saveFileInfo(fname, info):
85    f = open(fname, 'wt')
86    f.write('\n'.join([info['size'], info['datetime'], info['title'], ';'.join(info['tags'])]))
87    f.close()
88
89def _parseList(fl):
90    return fl.split("|||||")
91
92def _parseAllFileInfo(afi):
93    separf = "[[[[["
94    separn = "====="
95    fis = afi.split(separf)
96    out = []
97    for entry in fis:
98        if entry != "":
99            name, info = entry.split(separn)
100            out.append((name, _parseFileInfo(info)))
101
102    return dict(out)
103
104def createPathForFile(target):
105    try:
106        os.makedirs(os.path.dirname(target))
107    except OSError:
108        pass
109
110def createPath(target):
111    try:
112        os.makedirs(target)
113    except OSError:
114        pass
115
116def localpath(domain=None, filename=None):
117    """Return a path for the file in the local repository."""
118    import orngEnviron
119    if not domain:
120        return os.path.join(orngEnviron.directoryNames["bufferDir"], "bigfiles")
121    if filename:
122        return os.path.join(orngEnviron.directoryNames["bufferDir"], "bigfiles", domain, filename)
123    else:
124        return os.path.join(orngEnviron.directoryNames["bufferDir"], "bigfiles", domain)
125
126class ServerFiles(object):
127
128    def __init__(self, username=None, password=None, server=None, access_code=None):
129        if not server:
130            server = defserver
131        self.server = server
132        self.secureroot = 'https://' + self.server + 'private/'
133        self.publicroot = 'http://' + self.server + 'public/'
134        self.username = username
135        self.password = password
136        self.access_code = access_code
137        self.searchinfo = None
138
139    def getOpener(self):
140        #commented lines are for poster 0.6
141        #handlers = [psh.StreamingHTTPHandler, psh.StreamingHTTPRedirectHandler, psh.StreamingHTTPSHandler]
142        #opener = urllib2.build_opener(*handlers)
143        opener = urllib2.build_opener()
144        return opener
145 
146    def upload(self, domain, filename, file, title="", tags=[]):
147        """Upload the file to the server.
148        File can be an open file or a filename."""
149        if isinstance(file, basestring):
150            file = open(file, 'rb')
151
152        data = {'filename': filename, 'domain': domain, 'title':title, 'tags': ";".join(tags), 'data':  file}
153        return self._open('upload', data)
154
155    def create_domain(self, domain):
156        """Create the domain in the server repository."""
157        return self._open('createdomain', { 'domain': domain })
158
159    def remove_domain(self, domain, force=False):
160        """Remove the domain from the server repository. If force=True
161        remove if the domain is not empty (includes files)."""
162        data = { 'domain': domain }
163        if force:
164            data['force'] = True
165        return self._open('removedomain', data)
166
167    def remove(self, domain, filename):
168        """Remove a file from the server repository."""
169        return self._open('remove', { 'domain': domain, 'filename': filename })
170
171    def unprotect(self, domain, filename):
172        """Unprotect a file in the server repository."""
173        return self._open('protect', { 'domain': domain, 'filename': filename, 'access_code': '0' })
174
175    def protect(self, domain, filename, access_code="1"):
176        """Protect a file in the server repository."""
177        return self._open('protect', { 'domain': domain, 'filename': filename, 'access_code': access_code })
178
179    def protection(self, domain, filename):
180        """Return 1 if the file in the server is protected, else return 0."""
181        return self._open('protection', { 'domain': domain, 'filename': filename })
182   
183    def listfiles(self, domain):
184        return _parseList(self._open('list', { 'domain': domain }))
185
186    def listdomains(self):
187        return _parseList(self._open('listdomains', {}))
188
189    def downloadFH(self, *args, **kwargs):
190        """Return open file handle of requested file from the server repository given the domain and the filename."""
191        if self._authen(): return self.secdownloadFH(*args, **kwargs)
192        else: return self.pubdownloadFH(*args, **kwargs)
193
194    def download(self, domain, filename, target, callback=None):
195        """Download a file into target name. If target is not present,
196        file is downloaded into [bufferDir]/bigfiles/domain/filename."""
197        createPathForFile(target)
198
199        fdown = self.downloadFH(domain, filename)
200        size = int(fdown.headers.getheader('content-length'))
201
202        f = tempfile.TemporaryFile()
203 
204        chunksize = 1024*8
205        lastchunkreport= 0.0001
206
207        readb = 0
208        while 1:
209            buf = fdown.read(chunksize)
210            readb += len(buf)
211
212            while float(readb)/size > lastchunkreport+0.01:
213                #print float(readb)/size, lastchunkreport + 0.01, float(readb)/size - lastchunkreport
214                lastchunkreport += 0.01
215                if callback:
216                    callback()
217            if not buf:
218                break
219            f.write(buf)
220
221        fdown.close()
222        f.seek(0)
223
224        shutil.copyfileobj(f, open(target, "wb"))
225
226        if callback:
227            callback()
228
229    def _searchinfo(self):
230        domains = self.listdomains()
231        infos = {}
232        for dom in domains:
233            dominfo = self.allinfo(dom)
234            for a,b in dominfo.items():
235                infos[(dom, a)] = b
236        return infos
237
238    def search(self, sstrings, **kwargs):
239        if not self.searchinfo:
240            self.searchinfo = self._searchinfo()
241        return _search(self.searchinfo, sstrings, **kwargs)
242
243    def info(self, domain, filename):
244        return _parseFileInfo(self._open('info', { 'domain': domain, 'filename': filename }))
245
246    def downloadFH(self, domain, filename):
247        return self._handle('download', { 'domain': domain, 'filename': filename })
248
249    def list(self, domain):
250        return _parseList(self._open('list', { 'domain': domain }))
251
252    def listdomains(self):
253        return _parseList(self._open('listdomains', {}))
254
255    def allinfo(self, domain):
256        return _parseAllFileInfo(self._open('allinfo', { 'domain': domain }))
257
258    def index(self):
259        return self._open('index', {})
260
261    def _authen(self):
262        """
263        Did the user choose authentication?
264        """
265        if self.username and self.password:
266            return True
267        else:
268            return False
269
270    def _server_request(self, root, command, data, repeat=2):
271        def do():
272            opener = self.getOpener()
273            #the next lines work for poster 0.6.0
274            #datagen, headers = poster.encode.multipart_encode(data)
275            #request = urllib2.Request(root+command, datagen, headers)
276
277            if data:
278                request = urllib2.Request(root+command, data)
279            else:
280                request = urllib2.Request(root+command)
281
282            #directy add authorization headers
283            if self._authen():
284                auth = base64.encodestring('%s:%s' % (self.username, self.password))[:-1] 
285                request.add_header('Authorization', 'Basic %s' % auth ) # Add Auth header to request
286           
287            return opener.open(request)
288        if repeat <= 0:
289            return do()
290        else:
291            try:
292                return do()
293            except:
294                return self._server_request(root, command, data, repeat=repeat-1)
295   
296    def _handle(self, command, data):
297        data2 = self.addAccessCode(data)
298        addr = self.publicroot
299        if self._authen():
300            addr = self.secureroot
301        return self._server_request(addr, command, data)
302
303    def _open(self, command, data):
304        return self._handle(command, data).read()
305
306    def addAccessCode(self, data):
307        if self.access_code != None:
308            data = data.copy()
309            data["access_code"] = self.access_code
310        return data
311
312def download(domain, filename, serverfiles=None, callback=None, extract=True, verbose=True):
313    """Download a file from a server placing it in a local repository."""
314    if not serverfiles:
315        serverfiles = ServerFiles()
316
317    info = serverfiles.info(domain, filename)
318    specialtags = dict([tag.split(":") for tag in info["tags"] if tag.startswith("#") and ":" in tag])
319    extract = extract and ("#uncompressed" in specialtags or "#compression" in specialtags)
320    target = localpath(domain, filename)
321    callback = DownloadProgress(filename, int(info["size"])) if verbose and not callback else callback   
322    serverfiles.download(domain, filename, target + ".tmp" if extract else target, callback=callback)
323   
324    #file saved, now save info file
325
326    saveFileInfo(target + '.info', info)
327   
328    if extract:
329        import tarfile, gzip, shutil
330        if specialtags.get("#compression") == "tar.gz" and specialtags.get("#files"):
331            f = tarfile.open(target + ".tmp")
332            f.extractall(localpath(domain))
333            shutil.copyfile(target + ".tmp", target)
334        if filename.endswith(".tar.gz"):
335            f = tarfile.open(target + ".tmp")
336            try:
337                os.mkdir(target)
338            except Exception:
339                pass
340            f.extractall(target)
341        elif specialtags.get("#compression") == "gz":
342            f = gzip.open(target + ".tmp")
343            shutil.copyfileobj(f, open(target, "wb"))
344        f.close()
345        os.remove(target + ".tmp")
346
347    if type(callback) == DownloadProgress:
348        callback.finish()       
349
350def localpath_download(domain, filename, **kwargs):
351    """ Returns a location of the given file. If file is not on available yet,
352    download it. """
353    pathname = localpath(domain, filename)
354    if not os.path.exists(pathname):
355        download(domain, filename, **kwargs)
356    return pathname
357
358def listfiles(domain):
359    """Return a list of filenames in a given domain on local Orange
360    installation with a valid info file: useful ones."""
361    dir = localpath(domain)
362    try:
363        files = [a for a in os.listdir(dir) if a[-5:] == '.info' ]
364    except:
365        files = []
366    okfiles = []
367
368    for file in files:
369        #if file to exists without info
370        if os.path.exists(os.path.join(dir,file[:-5])):
371            #check info format - needs to be valid
372            try:
373                openFileInfo(os.path.join(dir,file))
374                okfiles.append(file[:-5])
375            except:
376                pass
377
378    return okfiles
379
380def remove(domain, filename):
381    """Remove a file from a local repository."""
382    filename = localpath(domain, filename)
383    import shutil
384   
385    specialtags = dict([tag.split(":") for tag in info(domain, filename)["tags"] if tag.startswith("#") and ":" in tag])
386    todelete = [filename, filename + ".info"] 
387    if "#files" in specialtags:
388        todelete.extend([os.path.join(localpath(domain), path) for path in specialtags.get("#files").split("!@")])
389#    print todelete
390    for path in todelete:
391        try:
392            if os.path.isdir(path):
393                shutil.rmtree(path)
394            elif os.path.isfile(path):
395                os.remove(path)
396        except OSError, ex:
397            print "Failed to delete", path, "due to:", ex
398   
399def remove_domain(domain, force=False):
400    """Remove a domain (directory) from the local repository."""
401    directory = localpath(domain)
402    if force:
403        import shutil
404        shutil.rmtree(directory)
405    else:
406        os.rmdir(directory)
407
408def listdomains():
409    """Return a list of domains from a local repository."""
410    dir = localpath()
411    createPath(dir)
412    files = [ a for a in os.listdir(dir) ]
413    ok = []
414    for file in files:
415        if os.path.isdir(os.path.join(dir, file)):
416            ok.append(file)
417    return ok
418
419def info(domain, filename):
420    """Returns info of a file in a local repository."""
421    target = localpath(domain, filename)
422    return openFileInfo(target + '.info')
423
424def allinfo(domain):
425    """Returns info of all files in a specific domain in a local reposiotory."""
426    files = listfiles(domain)
427    dic = {}
428    for filename in files:
429        target = localpath(domain, filename)
430        dic[filename] = info(domain, target)
431    return dic
432
433def needs_update(domain, filename, access_code=None):
434    """Returns true if a file does not exist in the local repository
435    or if there is a newer version on the server."""
436    if filename not in listfiles(domain):
437        return True
438    dt_fmt = "%Y-%m-%d %H:%M:%S"
439    dt_local = datetime.datetime.strptime(
440        info(domain, filename)["datetime"][:19], dt_fmt)
441    server = ServerFiles(access_code=access_code)
442    dt_server = datetime.datetime.strptime(
443        server.info(domain, filename)["datetime"][:19], dt_fmt)
444    return dt_server > dt_local
445
446def update(domain, filename, access_code=None, verbose=True):
447    """Downloads a file from a server placing it in a local repository
448    if a file on a server is newer or its local version does not exist."""
449    if needs_update(domain, filename, access_code=access_code):
450        if not access_code:
451            download(domain, filename, verbose=verbose)
452        else:
453            server = orngServerFiles.ServerFiles(access_code=access_code)
454            download(domain, filename, serverfiles=server)
455       
456def _searchinfo():
457    domains = listdomains()
458    infos = {}
459    for dom in domains:
460        dominfo = allinfo(dom)
461        for a,b in dominfo.items():
462            infos[(dom, a)] = b
463    return infos
464
465def _search(si, sstrings, caseSensitive=False, inTag=True, inTitle=True, inName=True):
466    """
467    sstrings contain a list of search strings
468    """
469    found = []
470
471    for (dom,fn),info in si.items():
472        target = ""
473        if inTag: target += " ".join(info['tags'])
474        if inTitle: target += info['title']
475        if inName: target += fn
476        if not caseSensitive: target = target.lower()
477
478        match = True
479        for s in sstrings:
480            if not caseSensitive:
481                s = s.lower()
482            if s not in target:
483                match= False
484                break
485               
486        if match:
487            found.append((dom,fn))   
488       
489    return found
490
491def search(sstrings, **kwargs):
492    si = _searchinfo()
493    return _search(si, sstrings, **kwargs)
494
495from orngMisc import ConsoleProgressBar
496import time, threading
497
498class DownloadProgress(ConsoleProgressBar):
499    redirect = None
500    lock = threading.RLock()
501    def sizeof_fmt(num):
502        for x in ['bytes','KB','MB','GB','TB']:
503            if num < 1024.0:
504                return "%3.1f %s" % (num, x) if x <> 'bytes' else "%1.0f %s" % (num, x)
505            num /= 1024.0
506           
507    def __init__(self, filename, size):
508        print "Downloading", filename
509        ConsoleProgressBar.__init__(self, "progress:", 20)
510        self.size = size
511        self.starttime = time.time()
512        self.speed = 0.0
513
514    def sizeof_fmt(self, num):
515        for x in ['bytes','KB','MB','GB','TB']:
516            if num < 1024.0:
517                return "%3.1f %s" % (num, x) if x <> 'bytes' else "%1.0f %s" % (num, x)
518            num /= 1024.0
519
520    def getstring(self):
521        speed = int(self.state * self.size / 100.0 / (time.time() - self.starttime))
522        eta = (100 - self.state) * self.size / 100.0 / speed
523        return ConsoleProgressBar.getstring(self) + %s  %12s/s  %3i:%02i ETA" % (self.sizeof_fmt(self.size), self.sizeof_fmt(speed), eta/60, eta%60)
524       
525    def __call__(self, *args, **kwargs):
526        ret = ConsoleProgressBar.__call__(self, *args, **kwargs)
527        if self.redirect:
528            self.redirect(self.state)
529        return ret
530   
531    class RedirectContext(object):
532        def __enter__(self):
533            DownloadProgress.lock.acquire()
534            return DownloadProgress
535       
536        def __exit__(self, ex_type, value, tb):
537            DownloadProgress.redirect = None
538            DownloadProgress.lock.release()
539            return False
540       
541    @classmethod
542    def setredirect(cls, redirect):
543        cls.redirect = staticmethod(redirect)
544        return cls.RedirectContext()
545   
546    @classmethod
547    def __enter__(cls):
548        cls.lock.acquire()
549        return cls
550   
551    @classmethod
552    def __exit__(cls, exc_type, exc_value, traceback):
553        cls.lock.release()
554        return False
555
556def consoleupdate(domains=None, searchstr="essential"):
557    domains = domains or listdomains()
558    sf = ServerFiles()
559    info = dict((d, sf.allinfo(d)) for d in domains)
560    def searchmenu():
561        def printmenu():
562            print "\tSearch tags:", search
563            print "\t1. Add tag."
564            print "\t2. Clear tags."
565            print "\t0. Return to main menu."
566            return raw_input("\tSelect option:")
567        search = searchstr
568        while True:
569            response = printmenu().strip()
570            if response == "1":
571                search += " " + raw_input("\tType new tag/tags:")
572            elif response == "2":
573                search = ""
574            elif response == "0":
575                break
576            else:
577                print "\tUnknown option!"
578        return search
579
580    def filemenu(searchstr=""):
581        files = [None]
582        for i, (dom, file) in enumerate(sf.search(searchstr.split())):
583            print "\t%i." % (i + 1), info[dom][file]["title"]
584            files.append((dom, file))
585        print "\t0. Return to main menu."
586        print "\tAction: d-download (e.g. 'd 1' downloads first file)"
587        while True:
588            response = raw_input("\tAction:").strip()
589            if response == "0":
590                break
591            try:
592                action, num = response.split(None, 1)
593                num = int(num)
594            except Exception, ex:
595                print "Unknown option!"
596                continue
597            try:
598                if action.lower() == "d":
599                    download(*(files[num]))
600                    print "\tSuccsessfully downloaded", files[num][-1]
601            except Exception, ex:
602                print "Error occured!", ex
603
604    def printmenu():
605        print "Update database main menu:"
606        print "1. Enter search tags (refine search)."
607        print "2. Print matching available files."
608        print "3. Print all available files."
609        print "4. Update all local files."
610        print "0. Exit."
611        return raw_input("Select option:")
612   
613    while True:
614        try:
615            response = printmenu().strip()
616            if response == "1":
617                searchstr = searchmenu()
618            elif response == "2":
619                filemenu(searchstr)
620            elif response == "3":
621                filemenu("")
622            elif response == "4":
623                update_local_files()
624            elif response == "0":
625                break
626            else:
627                print "Unknown option!"
628        except Exception, ex:
629            print "Error occured:", ex
630
631def update_local_files(verbose=True):
632    sf = ServerFiles()
633    for domain, filename in search(""):
634        uptodate = sf.info(domain, filename)["datetime"] <= info(domain, filename)["datetime"]
635        if not uptodate:
636            download(domain, filename, sf)
637        if verbose:
638            print filename, "Ok" if uptodate else "Updated"
639
640def update_by_tags(tags=["essential"], domains=[], verbose=True):
641    sf = ServerFiles()
642    for domain, filename in sf.search(tags + domains, inTitle=False, inName=False):
643        if domains and domain not in domain:
644            continue
645        if os.path.exists(localpath(domain, filename)+".info"):
646            uptodate = sf.info(domain, filename)["datetime"] <= info(domain, filename)["datetime"]
647        else:
648            uptodate = False
649        if not uptodate:
650            download(domain, filename, sf)
651        if verbose:
652            print filename, "Ok" if uptodate else "Updated"
653           
654def example(myusername, mypassword):
655
656    locallist = listfiles('test')
657    for l in locallist:
658        print info('test', l)
659
660    s = ServerFiles()
661
662    print "testing connection - public"
663    print "AN", s.index()
664
665    #login as an authenticated user
666    s = ServerFiles(username=myusername, password=mypassword)
667   
668    """
669    print "Server search 1"
670    import time
671    t = time.time()
672    print s.search(["rat"])
673    print time.time() - t
674
675    t = time.time()
676    print s.search(["human", "ke"])
677    print time.time() - t
678    """
679
680    print "testing connection - private"
681    print "AN", s.index()
682
683    #create domain
684    try: 
685        s.create_domain("test") 
686    except:
687        print "Failed to create the domain"
688        pass
689
690    files = s.listfiles('test')
691    print "Files in test", files
692
693    print "uploading"
694
695    #upload this file - save it by a different name
696    s.upload('test', 'osf-test.py', 'orngServerFiles.py', title="NT", tags=["fkdl","fdl"])
697    #make it public
698    s.unprotect('test', 'osf-test.py')
699
700    #login anonymously
701    s = ServerFiles()
702
703    #list files in the domain "test"
704    files = s.listfiles('test')
705    print "ALL FILES:", files
706
707    for f in files:
708        fi = s.info('test', f) 
709        print "--------------------------------------", f
710        print "INFO", fi
711        print s.downloadFH('test', f).read()[:100] #show first 100 characters
712        print "--------------------------------------"
713
714    #login as an authenticated user
715    s = ServerFiles(username=myusername, password=mypassword)
716
717    print s.listdomains()
718
719    s.remove('test', 'osf-test.py')
720
721    s = ServerFiles()
722
723    print s.listdomains()
724
725
726if __name__ == '__main__':
727    example(sys.argv[1], sys.argv[2])
Note: See TracBrowser for help on using the repository browser.