source: orange-bioinformatics/Orange/bioinformatics/obiKEGG2/pathway.py @ 1625:cefeb35cbfc9

Revision 1625:cefeb35cbfc9, 9.3 KB checked in by mitar, 2 years ago (diff)

Moving files around.

Line 
1"""
2KEGG Pathway (from kgml file)
3
4"""
5from __future__ import absolute_import
6
7import os
8import urllib2
9
10import xml.dom
11import xml.parsers
12from xml.dom import minidom
13
14from contextlib import closing
15
16from . import conf
17from . import caching
18
19from Orange.utils import deprecated_attribute
20   
21def cached_method(func, cache_name="_cached_method_cache", store=None):
22    def wrapper(self, *args, **kwargs):
23        sig = (func.__name__,) + args + tuple(sorted(kwargs.items()))
24        if not hasattr(self, cache_name):
25            setattr(self, cache_name, store() if store is not None else {})
26        if sig not in getattr(self, cache_name):
27            getattr(self, cache_name)[sig] = func(self, *args, **kwargs)
28        return getattr(self, cache_name)[sig]
29    return wrapper
30
31class Pathway(object):
32    KGML_URL_FORMAT = "http://www.genome.jp/kegg-bin/download?entry={pathway_id}&format=kgml"
33   
34    def __init__(self, pathway_id, local_cache=None, connection=None):
35        if pathway_id.startswith("path:"):
36            _, pathway_id = pathway_id.split(":", 1)
37           
38        self.pathway_id = pathway_id
39        if local_cache is None:
40            local_cache = conf.params["cache.path"]
41        self.local_cache = local_cache
42        self.connection = connection
43       
44    def cache_store(self):
45        caching.touch_path(self.local_cache)
46        return caching.Sqlite3Store(os.path.join(self.local_cache,
47                                                 "pathway_store.sqlite3"))
48       
49    def _open_last_modified_store(self):
50        import shelve
51        caching.touch_dir(self.local_cache)
52        return caching.Sqlite3Store(os.path.join(self.local_cache,
53                                                 "last_modified.sqlite3"))
54       
55    def _get_kgml(self):
56        """ Return an open kgml file for the pathway.
57        """
58        from datetime import datetime, timedelta
59        valid = False
60        local_filename = os.path.join(self.local_cache, self.pathway_id + ".xml")
61        if os.path.exists(local_filename):
62            mtime = os.stat(local_filename).st_mtime
63            mtime = datetime.fromtimestamp(mtime)
64            now = datetime.now()
65            if conf.params["cache.invalidate"] == "always":
66                valid = False
67            elif conf.params["cache.invalidate"] == "session":
68                valid = (now - mtime) < (now - caching._SESSION_START)
69            elif conf.params["cache.invalidate"] == "daily":
70                valid = (now - mtime) < timedelta(1)
71            elif conf.params["cache.invalidate"] == "weekly":
72                valid = (now - mtime) < timedelta(7)
73            else:
74                valid = False
75       
76        if not valid:
77            url = self.KGML_URL_FORMAT.format(pathway_id=self.pathway_id)
78            s = urllib2.urlopen(url)
79            contents = s.read()
80           
81            with open(local_filename, "wb") as f:
82                f.write(contents)
83               
84        return open(local_filename, "rb")
85   
86    def _get_image_filename(self):
87        """ Return a filename of a local copy of the pathway image
88        """
89        # TODO: keep-alive (using httplib if it supports it)
90        # better to move all code to use requests package
91       
92        url = str(self.image)
93       
94        local_filename = os.path.join(self.local_cache, self.pathway_id + ".png")
95       
96        if not os.path.exists(local_filename):
97            response = urllib2.urlopen(url)
98            modified_since = response.headers.get("last-modified")
99            image = response.read()
100        else:
101            request = urllib2.Request(url)
102            modified_cache = os.path.join(self.local_cache, "http-last-modified-cache.store")
103            with closing(self._open_last_modified_store()) as store:
104                modified_since = store.get(url, None)
105               
106            request.add_header("If-Modified-Since", modified_since)
107            try:
108                response = urllib2.urlopen(request)
109            except urllib2.HTTPError, ex:
110                if ex.code == 304:
111                    return local_filename
112                else:
113                    raise
114            modified_since = response.headers.get("last-modified")
115            image = response.read()
116               
117        with open(local_filename, "wb") as f:
118            f.write(image)
119           
120        with closing(self._open_last_modified_store()) as store:
121            store[url] = modified_since
122           
123        return local_filename
124       
125    def _local_kgml_filename(self):
126        """ Return the local kgml xml filename for the pathway.
127        """
128        local_filename = os.path.join(self.local_cache, self.pathway_id + ".xml")
129        return local_filename
130       
131    class entry(object):
132        def __init__(self, dom_element):
133            self.__dict__.update(dom_element.attributes.items())
134            self.graphics = ()
135            self.components = []
136            self.graphics = dict(dom_element.getElementsByTagName("graphics")[0].attributes.items())
137            self.components = [node.getAttribute("id") for node in dom_element.getElementsByTagName("component")]
138           
139    class reaction(object):
140        def __init__(self, dom_element):
141            self.__dict__.update(dom_element.attributes.items())
142            self.substrates = [node.getAttribute("name") for node in dom_element.getElementsByTagName("substrate")]
143            self.products = [node.getAttribute("name") for node in dom_element.getElementsByTagName("product")]
144           
145    class relation(object):
146        def __init__(self, dom_element):
147            self.__dict__.update(dom_element.attributes.items())
148            self.subtypes = [node.attributes.items() for node in dom_element.getElementsByTagName("subtype")]
149       
150    @cached_method
151    def pathway_attributes(self):
152        return dict(self.pathway_dom().attributes.items())
153   
154    @property
155    def name(self):
156        return self.pathway_attributes().get("name")
157   
158    @property
159    def org(self):
160        return self.pathway_attributes().get("org")
161   
162    @property
163    def number(self):
164        return self.pathway_attributes().get("number")
165   
166    @property   
167    def title(self):
168        return self.pathway_attributes().get("title")
169   
170    @property
171    def image(self):
172        return self.pathway_attributes().get("image")
173   
174    @property
175    def link(self):
176        return self.pathway_attributes().get("link")
177   
178    @cached_method
179    def pathway_dom(self):
180        try:
181            return minidom.parse(self._get_kgml()).getElementsByTagName("pathway")[0]
182        except xml.parsers.expat.ExpatError:
183            # TODO: Should delete the cached xml file.
184            return None
185   
186    @cached_method
187    def entries(self):
188        dom = self.pathway_dom()
189        if dom:
190            return [self.entry(e) for e in dom.getElementsByTagName("entry")]
191        else:
192            return []
193   
194    entrys = deprecated_attribute("entrys", "entries")
195   
196    @cached_method
197    def reactions(self):
198        dom = self.pathway_dom()
199        if dom:
200            return [self.reaction(e) for e in dom.getElementsByTagName("reaction")]
201        else:
202            return []
203   
204    @cached_method
205    def relations(self):
206        dom = self.pathway_dom()
207        if dom:
208            return [self.relation(e) for e in dom.getElementsByTagName("relation")]
209        else:
210            return []
211   
212    def __iter__(self):
213        """ Iterate over all elements in the pathway
214        """
215        return iter(self.all_elements())
216   
217    def __contains__(self, element):
218        """ Retrurn true if element in the pathway
219        """
220        return element in self.all_elements()
221   
222    @classmethod
223    def split_pathway_id(cls, id):
224        path, id = id.split(":") if ":" in id else ("path", id)
225        org, id = id[:-5], id[-5:]
226        return path, org, id 
227   
228    @cached_method
229    def all_elements(self):
230        """ Return all elements
231        """
232        return reduce(list.__add__, [self.genes(), self.compounds(), self.enzmes(), self.reactions()], [])
233   
234    def _get_entries_by_type(self, type):
235        return sorted(reduce(set.union, [entry.name.split() for entry in self.entries() if entry.type == type], set()))
236   
237    @cached_method
238    def genes(self):
239        """ Return all genes on the pathway
240        """
241        return self._get_entries_by_type("gene")
242   
243    @cached_method
244    def compounds(self):
245        """ Return all compounds on the pathway
246        """
247        return self._get_entries_by_type("compound")
248   
249    @cached_method
250    def enzymes(self):
251        """ Return all enzymes on the pathway
252        """
253        return self._get_entries_by_type("enzyme")
254   
255    @cached_method
256    def orthologs(self):
257        """ Return all orthologs on the pathway
258        """
259        return self._get_entries_by_type("ortholog")
260   
261    @cached_method
262    def maps(self):
263        """ Return all linked maps on the pathway
264        """
265        return self._get_entries_by_type("map")
266   
267    @cached_method
268    def groups(self):
269        """ Return all groups on the pathway
270        """
271        return self._get_entries_by_type("ortholog")
272   
273    def get_image(self):
274        """ Return an image of the pathway
275        """
276        return self._get_image_filename()
277   
278    @classmethod
279    def list(cls, organism):
280        from . import api
281        kegg = api.CachedKeggApi()
282        return kegg.list_pathways(organism)
283   
Note: See TracBrowser for help on using the repository browser.