source: orange-bioinformatics/orangecontrib/bio/obiKEGG/pathway.py @ 1873:0810c5708cc5

Revision 1873:0810c5708cc5, 10.1 KB checked in by Ales Erjavec <ales.erjavec@…>, 6 months ago (diff)

Moved '_bioinformatics' into orangecontrib namespace.

Line 
1"""
2============
3KEGG Pathway
4============
5
6"""
7from __future__ import absolute_import
8
9import os
10import urllib2
11
12import xml.parsers
13from xml.dom import minidom
14
15from contextlib import closing
16
17from Orange.utils import deprecated_attribute
18
19from . import conf
20from . import caching
21from . import api
22
23
24def cached_method(func, cache_name="_cached_method_cache", store=None):
25    def wrapper(self, *args, **kwargs):
26        sig = (func.__name__,) + args + tuple(sorted(kwargs.items()))
27        if not hasattr(self, cache_name):
28            setattr(self, cache_name, store() if store is not None else {})
29        if sig not in getattr(self, cache_name):
30            getattr(self, cache_name)[sig] = func(self, *args, **kwargs)
31        return getattr(self, cache_name)[sig]
32    return wrapper
33
34
35class Pathway(object):
36    """
37    Class representing a KEGG Pathway (parsed from a "kgml" file)
38
39    :param str pathway_id: A KEGG pathway id (e.g. 'path:hsa05130')
40
41    """
42    KGML_URL_FORMAT = "http://www.genome.jp/kegg-bin/download?entry={pathway_id}&format=kgml"
43
44    def __init__(self, pathway_id, local_cache=None, connection=None):
45        if pathway_id.startswith("path:"):
46            _, pathway_id = pathway_id.split(":", 1)
47
48        self.pathway_id = pathway_id
49        if local_cache is None:
50            local_cache = conf.params["cache.path"]
51        self.local_cache = local_cache
52        self.connection = connection
53
54    def cache_store(self):
55        caching.touch_path(self.local_cache)
56        return caching.Sqlite3Store(os.path.join(self.local_cache,
57                                                 "pathway_store.sqlite3"))
58
59    def _open_last_modified_store(self):
60        caching.touch_dir(self.local_cache)
61        return caching.Sqlite3Store(os.path.join(self.local_cache,
62                                                 "last_modified.sqlite3"))
63
64    def _get_kgml(self):
65        """
66        Return an open kgml file for the pathway.
67        """
68        from datetime import datetime, timedelta
69        valid = False
70        local_filename = os.path.join(self.local_cache,
71                                      self.pathway_id + ".xml")
72        if os.path.exists(local_filename):
73            mtime = os.stat(local_filename).st_mtime
74            mtime = datetime.fromtimestamp(mtime)
75            now = datetime.now()
76            if conf.params["cache.invalidate"] == "always":
77                valid = False
78            elif conf.params["cache.invalidate"] == "session":
79                valid = (now - mtime) < (now - caching._SESSION_START)
80            elif conf.params["cache.invalidate"] == "daily":
81                valid = (now - mtime) < timedelta(1)
82            elif conf.params["cache.invalidate"] == "weekly":
83                valid = (now - mtime) < timedelta(7)
84            else:
85                valid = False
86
87        if not valid:
88            url = self.KGML_URL_FORMAT.format(pathway_id=self.pathway_id)
89            s = urllib2.urlopen(url)
90            contents = s.read()
91
92            with open(local_filename, "wb") as f:
93                f.write(contents)
94
95        return open(local_filename, "rb")
96
97    def _get_image_filename(self):
98        """
99        Return a filename of a local copy of the pathway image
100        """
101        # TODO: keep-alive (using httplib if it supports it)
102        # better to move all code to use requests package
103
104        url = str(self.image)
105
106        local_filename = os.path.join(self.local_cache,
107                                      self.pathway_id + ".png")
108
109        if not os.path.exists(local_filename):
110            response = urllib2.urlopen(url)
111            modified_since = response.headers.get("last-modified")
112            image = response.read()
113        else:
114            request = urllib2.Request(url)
115            with closing(self._open_last_modified_store()) as store:
116                modified_since = store.get(url, None)
117
118            request.add_header("If-Modified-Since", modified_since)
119            try:
120                response = urllib2.urlopen(request)
121            except urllib2.HTTPError, ex:
122                if ex.code == 304:
123                    return local_filename
124                else:
125                    raise
126            modified_since = response.headers.get("last-modified")
127            image = response.read()
128
129        with open(local_filename, "wb") as f:
130            f.write(image)
131
132        with closing(self._open_last_modified_store()) as store:
133            store[url] = modified_since
134
135        return local_filename
136
137    def _local_kgml_filename(self):
138        """
139        Return the local kgml xml filename for the pathway.
140        """
141        local_filename = os.path.join(self.local_cache,
142                                      self.pathway_id + ".xml")
143        return local_filename
144
145    class entry(object):
146        def __init__(self, dom_element):
147            self.__dict__.update(dom_element.attributes.items())
148            self.graphics = ()
149            self.components = []
150
151            graphics = dom_element.getElementsByTagName("graphics")[0]
152            self.graphics = dict(graphics.attributes.items())
153
154            components = dom_element.getElementsByTagName("component")
155            self.components = [node.getAttribute("id") for node in components]
156
157    class reaction(object):
158        def __init__(self, dom_element):
159            self.__dict__.update(dom_element.attributes.items())
160            self.substrates = [node.getAttribute("name") for node in
161                               dom_element.getElementsByTagName("substrate")]
162            self.products = [node.getAttribute("name") for node in
163                             dom_element.getElementsByTagName("product")]
164
165    class relation(object):
166        def __init__(self, dom_element):
167            self.__dict__.update(dom_element.attributes.items())
168            self.subtypes = [node.attributes.items() for node in
169                             dom_element.getElementsByTagName("subtype")]
170
171    @cached_method
172    def pathway_attributes(self):
173        return dict(self.pathway_dom().attributes.items())
174
175    @property
176    def name(self):
177        """
178        Pathway name/id (e.g. "path:hsa05130")
179        """
180        return self.pathway_attributes().get("name")
181
182    @property
183    def org(self):
184        """
185        Pathway organism code (e.g. 'hsa')
186        """
187        return self.pathway_attributes().get("org")
188
189    @property
190    def number(self):
191        """
192        Pathway number as a string (e.g. '05130')
193        """
194        return self.pathway_attributes().get("number")
195
196    @property
197    def title(self):
198        """
199        Pathway title string.
200        """
201        return self.pathway_attributes().get("title")
202
203    @property
204    def image(self):
205        """
206        URL of the pathway image.
207        """
208        return self.pathway_attributes().get("image")
209
210    @property
211    def link(self):
212        """
213        URL to a pathway on the KEGG web site.
214        """
215        return self.pathway_attributes().get("link")
216
217    @cached_method
218    def pathway_dom(self):
219        try:
220            return minidom.parse(self._get_kgml()).getElementsByTagName("pathway")[0]
221        except xml.parsers.expat.ExpatError:
222            # TODO: Should delete the cached xml file.
223            return None
224
225    @cached_method
226    def entries(self):
227        dom = self.pathway_dom()
228        if dom:
229            return [self.entry(e) for e in dom.getElementsByTagName("entry")]
230        else:
231            return []
232
233    entrys = deprecated_attribute("entrys", "entries")
234
235    @cached_method
236    def reactions(self):
237        dom = self.pathway_dom()
238        if dom:
239            return [self.reaction(e) for e in dom.getElementsByTagName("reaction")]
240        else:
241            return []
242
243    @cached_method
244    def relations(self):
245        dom = self.pathway_dom()
246        if dom:
247            return [self.relation(e) for e in dom.getElementsByTagName("relation")]
248        else:
249            return []
250
251    def __iter__(self):
252        """
253        Iterate over all elements in the pathway.
254        """
255        return iter(self.all_elements())
256
257    def __contains__(self, element):
258        """
259        Return ``True`` if element in the pathway.
260        """
261        return element in self.all_elements()
262
263    @classmethod
264    def split_pathway_id(cls, id):
265        path, id = id.split(":") if ":" in id else ("path", id)
266        org, id = id[:-5], id[-5:]
267        return path, org, id
268
269    @cached_method
270    def all_elements(self):
271        """
272        Return all elements
273        """
274        return reduce(list.__add__,
275                      [self.genes(), self.compounds(),
276                       self.enzmes(), self.reactions()],
277                      [])
278
279    def _get_entries_by_type(self, type):
280        return sorted(reduce(set.union,
281                             [entry.name.split() for entry in self.entries()
282                              if entry.type == type],
283                             set()))
284
285    @cached_method
286    def genes(self):
287        """
288        Return all genes on the pathway.
289        """
290        return self._get_entries_by_type("gene")
291
292    @cached_method
293    def compounds(self):
294        """
295        Return all compounds on the pathway.
296        """
297        return self._get_entries_by_type("compound")
298
299    @cached_method
300    def enzymes(self):
301        """
302        Return all enzymes on the pathway.
303        """
304        return self._get_entries_by_type("enzyme")
305
306    @cached_method
307    def orthologs(self):
308        """
309        Return all orthologs on the pathway.
310        """
311        return self._get_entries_by_type("ortholog")
312
313    @cached_method
314    def maps(self):
315        """
316        Return all linked maps on the pathway.
317        """
318        return self._get_entries_by_type("map")
319
320    @cached_method
321    def groups(self):
322        """
323        Return all groups on the pathway.
324        """
325        return self._get_entries_by_type("ortholog")
326
327    def get_image(self):
328        """
329        Return an local filesystem path to an image of the pathway. The image
330        will be downloaded if not already cached.
331        """
332        return self._get_image_filename()
333
334    @classmethod
335    def list(cls, organism):
336        """
337        List all pathways for KEGG organism code `organism`.
338        """
339        kegg = api.CachedKeggApi()
340        return kegg.list_pathways(organism)
Note: See TracBrowser for help on using the repository browser.