source: orange-bioinformatics/obiKEGG2/brite.py @ 1532:14a377419a09

Revision 1532:14a377419a09, 2.5 KB checked in by ales_erjavec, 2 years ago (diff)

Added KEGG interface using web services

Line 
1"""
2KEGG brite
3
4"""
5from __future__ import absolute_import
6
7import os
8import re
9import urllib2
10
11from Orange.misc import deprecated_attribute
12
13from . import conf
14class BriteEntry(object):
15    _search_re = {"ids": re.compile('(?P<ids>\[.*:.*\])'),
16                  "title": re.compile(r'(<[Bb]>)?(?P<title>\b[a-zA-Z0-9_/\s,;:.+=\-\[\]{}\(\)]+?)(?(1)</[Bb]>)$'),
17                  "links": re.compile('(?P<links><a href=".+?">.*?</a>)')}
18    def __init__(self, line):
19        self.entries = []
20        self.line = line[1:].strip()
21        for name, re in self._search_re.items():
22            search = re.search(self.line)
23            setattr(self, name, search.group(name) if search else None)
24
25    def __iter__(self):
26        return iter(self.entries)
27
28    entrys = deprecated_attribute("entrys", "entries")
29
30class Brite(BriteEntry):
31    VERSION = "v1.0"
32    BRITE_URL_FORMAT = "http://www.genome.jp/kegg-bin/download_htext?htext={brite_id}.keg&format=htext&filedir="
33   
34    def __init__(self, brite_id, local_cache=None):
35        super(Brite, self).__init__("")
36        self.brite_id = id
37        if local_cache is None:
38            local_cache = conf.params["cache.path"]
39        self.local_cache = local_cache
40       
41        self.load(brite_id)
42   
43    def _get_brite(self, brite_id):
44        url = self.BRITE_URL_FORMAT.format(brite_id=brite_id)
45        local_filename = os.path.join(self.local_cache, brite_id + ".keg")
46        if not os.path.exists(local_filename):
47            brite = urllib2.urlopen(url).read()
48            with open(local_filename, "wb") as f:
49                f.write(brite)
50               
51        return open(local_filename, "rb")
52       
53    def load(self, brite_id):
54        lines = self._get_brite(brite_id).read().split("\n!\n")[1].splitlines()
55       
56        # TODO: Implement a proper parser
57       
58        def collect(lines, depth, collection):
59            while lines:
60                line = lines[0]
61                if line.startswith("#"):
62                    lines.pop(0)
63                elif line.startswith(depth) and len(line.strip()) > 1:
64                    collection.append(BriteEntry(lines.pop(0))) 
65                elif line[0] > depth:
66                    collect(lines, line[0], collection[-1].entries)
67                elif line[0] < depth:
68                    return
69                else:
70                    lines.pop(0)
71                       
72        collect([line for line in lines if not line.startswith("#") and len(line) > 1], "A", self.entries)
Note: See TracBrowser for help on using the repository browser.