source: orange-bioinformatics/_bioinformatics/obiKEGG/pathway.py @ 1736:b3b664abcfdd

Revision 1736:b3b664abcfdd, 10.1 KB checked in by Ales Erjavec <ales.erjavec@…>, 13 months ago (diff)

Added sphinx rst documentation for obiKEGG.

RevLine 
[1532]1"""
[1736]2============
3KEGG Pathway
4============
[1532]5
6"""
7from __future__ import absolute_import
8
9import os
10import urllib2
11
12import xml.parsers
13from xml.dom import minidom
14
15from contextlib import closing
16
[1734]17from Orange.utils import deprecated_attribute
18
[1532]19from . import conf
20from . import caching
[1733]21from . import api
[1532]22
[1733]23
[1532]24def cached_method(func, cache_name="_cached_method_cache", store=None):
25    def wrapper(self, *args, **kwargs):
26        sig = (func.__name__,) + args + tuple(sorted(kwargs.items()))
27        if not hasattr(self, cache_name):
28            setattr(self, cache_name, store() if store is not None else {})
29        if sig not in getattr(self, cache_name):
30            getattr(self, cache_name)[sig] = func(self, *args, **kwargs)
31        return getattr(self, cache_name)[sig]
32    return wrapper
33
[1734]34
[1532]35class Pathway(object):
[1736]36    """
37    Class representing a KEGG Pathway (parsed from a "kgml" file)
38
39    :param str pathway_id: A KEGG pathway id (e.g. 'path:hsa05130')
40
41    """
[1532]42    KGML_URL_FORMAT = "http://www.genome.jp/kegg-bin/download?entry={pathway_id}&format=kgml"
[1734]43
[1532]44    def __init__(self, pathway_id, local_cache=None, connection=None):
45        if pathway_id.startswith("path:"):
46            _, pathway_id = pathway_id.split(":", 1)
[1734]47
[1532]48        self.pathway_id = pathway_id
49        if local_cache is None:
50            local_cache = conf.params["cache.path"]
51        self.local_cache = local_cache
52        self.connection = connection
[1734]53
[1532]54    def cache_store(self):
55        caching.touch_path(self.local_cache)
56        return caching.Sqlite3Store(os.path.join(self.local_cache,
57                                                 "pathway_store.sqlite3"))
[1734]58
[1532]59    def _open_last_modified_store(self):
60        caching.touch_dir(self.local_cache)
61        return caching.Sqlite3Store(os.path.join(self.local_cache,
[1543]62                                                 "last_modified.sqlite3"))
[1734]63
[1532]64    def _get_kgml(self):
[1736]65        """
66        Return an open kgml file for the pathway.
[1532]67        """
[1604]68        from datetime import datetime, timedelta
69        valid = False
[1734]70        local_filename = os.path.join(self.local_cache,
71                                      self.pathway_id + ".xml")
[1604]72        if os.path.exists(local_filename):
73            mtime = os.stat(local_filename).st_mtime
74            mtime = datetime.fromtimestamp(mtime)
75            now = datetime.now()
76            if conf.params["cache.invalidate"] == "always":
77                valid = False
78            elif conf.params["cache.invalidate"] == "session":
79                valid = (now - mtime) < (now - caching._SESSION_START)
80            elif conf.params["cache.invalidate"] == "daily":
81                valid = (now - mtime) < timedelta(1)
82            elif conf.params["cache.invalidate"] == "weekly":
83                valid = (now - mtime) < timedelta(7)
84            else:
85                valid = False
[1734]86
[1604]87        if not valid:
[1532]88            url = self.KGML_URL_FORMAT.format(pathway_id=self.pathway_id)
89            s = urllib2.urlopen(url)
90            contents = s.read()
[1734]91
[1532]92            with open(local_filename, "wb") as f:
93                f.write(contents)
[1734]94
[1532]95        return open(local_filename, "rb")
[1734]96
[1532]97    def _get_image_filename(self):
[1736]98        """
99        Return a filename of a local copy of the pathway image
[1532]100        """
101        # TODO: keep-alive (using httplib if it supports it)
102        # better to move all code to use requests package
[1734]103
[1532]104        url = str(self.image)
[1734]105
106        local_filename = os.path.join(self.local_cache,
107                                      self.pathway_id + ".png")
108
[1532]109        if not os.path.exists(local_filename):
110            response = urllib2.urlopen(url)
111            modified_since = response.headers.get("last-modified")
112            image = response.read()
113        else:
114            request = urllib2.Request(url)
115            with closing(self._open_last_modified_store()) as store:
116                modified_since = store.get(url, None)
[1734]117
[1532]118            request.add_header("If-Modified-Since", modified_since)
119            try:
120                response = urllib2.urlopen(request)
121            except urllib2.HTTPError, ex:
122                if ex.code == 304:
123                    return local_filename
124                else:
125                    raise
126            modified_since = response.headers.get("last-modified")
127            image = response.read()
[1734]128
[1532]129        with open(local_filename, "wb") as f:
130            f.write(image)
[1734]131
[1532]132        with closing(self._open_last_modified_store()) as store:
133            store[url] = modified_since
[1734]134
135        return local_filename
136
[1532]137    def _local_kgml_filename(self):
[1736]138        """
139        Return the local kgml xml filename for the pathway.
[1532]140        """
[1734]141        local_filename = os.path.join(self.local_cache,
142                                      self.pathway_id + ".xml")
[1532]143        return local_filename
[1734]144
[1532]145    class entry(object):
146        def __init__(self, dom_element):
147            self.__dict__.update(dom_element.attributes.items())
148            self.graphics = ()
149            self.components = []
[1736]150
151            graphics = dom_element.getElementsByTagName("graphics")[0]
152            self.graphics = dict(graphics.attributes.items())
153
154            components = dom_element.getElementsByTagName("component")
155            self.components = [node.getAttribute("id") for node in components]
[1734]156
[1532]157    class reaction(object):
158        def __init__(self, dom_element):
159            self.__dict__.update(dom_element.attributes.items())
[1736]160            self.substrates = [node.getAttribute("name") for node in
161                               dom_element.getElementsByTagName("substrate")]
162            self.products = [node.getAttribute("name") for node in
163                             dom_element.getElementsByTagName("product")]
[1734]164
[1532]165    class relation(object):
166        def __init__(self, dom_element):
167            self.__dict__.update(dom_element.attributes.items())
[1736]168            self.subtypes = [node.attributes.items() for node in
169                             dom_element.getElementsByTagName("subtype")]
[1734]170
[1532]171    @cached_method
172    def pathway_attributes(self):
173        return dict(self.pathway_dom().attributes.items())
[1734]174
[1532]175    @property
176    def name(self):
[1736]177        """
178        Pathway name/id (e.g. "path:hsa05130")
179        """
[1532]180        return self.pathway_attributes().get("name")
[1734]181
[1532]182    @property
183    def org(self):
[1736]184        """
185        Pathway organism code (e.g. 'hsa')
186        """
[1532]187        return self.pathway_attributes().get("org")
[1734]188
[1532]189    @property
190    def number(self):
[1736]191        """
192        Pathway number as a string (e.g. '05130')
193        """
[1532]194        return self.pathway_attributes().get("number")
[1734]195
196    @property
[1532]197    def title(self):
[1736]198        """
199        Pathway title string.
200        """
[1532]201        return self.pathway_attributes().get("title")
[1734]202
[1532]203    @property
204    def image(self):
[1736]205        """
206        URL of the pathway image.
207        """
[1532]208        return self.pathway_attributes().get("image")
[1734]209
[1532]210    @property
211    def link(self):
[1736]212        """
213        URL to a pathway on the KEGG web site.
214        """
[1532]215        return self.pathway_attributes().get("link")
[1734]216
[1532]217    @cached_method
218    def pathway_dom(self):
219        try:
220            return minidom.parse(self._get_kgml()).getElementsByTagName("pathway")[0]
221        except xml.parsers.expat.ExpatError:
222            # TODO: Should delete the cached xml file.
223            return None
[1734]224
[1532]225    @cached_method
226    def entries(self):
227        dom = self.pathway_dom()
228        if dom:
229            return [self.entry(e) for e in dom.getElementsByTagName("entry")]
230        else:
231            return []
[1734]232
[1532]233    entrys = deprecated_attribute("entrys", "entries")
[1734]234
[1532]235    @cached_method
236    def reactions(self):
237        dom = self.pathway_dom()
238        if dom:
239            return [self.reaction(e) for e in dom.getElementsByTagName("reaction")]
240        else:
241            return []
[1734]242
[1532]243    @cached_method
244    def relations(self):
245        dom = self.pathway_dom()
246        if dom:
247            return [self.relation(e) for e in dom.getElementsByTagName("relation")]
248        else:
249            return []
[1734]250
[1532]251    def __iter__(self):
[1736]252        """
253        Iterate over all elements in the pathway.
[1532]254        """
255        return iter(self.all_elements())
[1734]256
[1532]257    def __contains__(self, element):
[1736]258        """
259        Return ``True`` if element in the pathway.
[1532]260        """
261        return element in self.all_elements()
[1734]262
[1532]263    @classmethod
264    def split_pathway_id(cls, id):
265        path, id = id.split(":") if ":" in id else ("path", id)
266        org, id = id[:-5], id[-5:]
[1734]267        return path, org, id
268
[1532]269    @cached_method
270    def all_elements(self):
[1736]271        """
272        Return all elements
[1532]273        """
[1734]274        return reduce(list.__add__,
275                      [self.genes(), self.compounds(),
276                       self.enzmes(), self.reactions()],
277                      [])
278
[1532]279    def _get_entries_by_type(self, type):
[1734]280        return sorted(reduce(set.union,
281                             [entry.name.split() for entry in self.entries()
282                              if entry.type == type],
283                             set()))
284
[1532]285    @cached_method
286    def genes(self):
[1736]287        """
288        Return all genes on the pathway.
[1532]289        """
290        return self._get_entries_by_type("gene")
[1734]291
[1532]292    @cached_method
293    def compounds(self):
[1736]294        """
295        Return all compounds on the pathway.
[1532]296        """
297        return self._get_entries_by_type("compound")
[1734]298
[1532]299    @cached_method
300    def enzymes(self):
[1736]301        """
302        Return all enzymes on the pathway.
[1532]303        """
304        return self._get_entries_by_type("enzyme")
[1734]305
[1532]306    @cached_method
307    def orthologs(self):
[1736]308        """
309        Return all orthologs on the pathway.
[1532]310        """
311        return self._get_entries_by_type("ortholog")
[1734]312
[1532]313    @cached_method
314    def maps(self):
[1736]315        """
316        Return all linked maps on the pathway.
[1532]317        """
318        return self._get_entries_by_type("map")
[1734]319
[1532]320    @cached_method
321    def groups(self):
[1736]322        """
323        Return all groups on the pathway.
[1532]324        """
325        return self._get_entries_by_type("ortholog")
[1734]326
[1532]327    def get_image(self):
[1736]328        """
329        Return an local filesystem path to an image of the pathway. The image
330        will be downloaded if not already cached.
[1532]331        """
332        return self._get_image_filename()
[1733]333
[1532]334    @classmethod
335    def list(cls, organism):
[1736]336        """
337        List all pathways for KEGG organism code `organism`.
338        """
[1532]339        kegg = api.CachedKeggApi()
340        return kegg.list_pathways(organism)
Note: See TracBrowser for help on using the repository browser.