source: orange-bioinformatics/_bioinformatics/obiKEGG/pathway.py @ 1734:91d14dd2cf0e

Revision 1734:91d14dd2cf0e, 9.2 KB checked in by Ales Erjavec <ales.erjavec@…>, 14 months ago (diff)

obiKEGG code style fixes.

RevLine 
[1532]1"""
2KEGG Pathway (from kgml file)
3
4"""
5from __future__ import absolute_import
6
7import os
8import urllib2
9
10import xml.parsers
11from xml.dom import minidom
12
13from contextlib import closing
14
[1734]15from Orange.utils import deprecated_attribute
16
[1532]17from . import conf
18from . import caching
[1733]19from . import api
[1532]20
[1733]21
[1532]22def cached_method(func, cache_name="_cached_method_cache", store=None):
23    def wrapper(self, *args, **kwargs):
24        sig = (func.__name__,) + args + tuple(sorted(kwargs.items()))
25        if not hasattr(self, cache_name):
26            setattr(self, cache_name, store() if store is not None else {})
27        if sig not in getattr(self, cache_name):
28            getattr(self, cache_name)[sig] = func(self, *args, **kwargs)
29        return getattr(self, cache_name)[sig]
30    return wrapper
31
[1734]32
[1532]33class Pathway(object):
34    KGML_URL_FORMAT = "http://www.genome.jp/kegg-bin/download?entry={pathway_id}&format=kgml"
[1734]35
[1532]36    def __init__(self, pathway_id, local_cache=None, connection=None):
37        if pathway_id.startswith("path:"):
38            _, pathway_id = pathway_id.split(":", 1)
[1734]39
[1532]40        self.pathway_id = pathway_id
41        if local_cache is None:
42            local_cache = conf.params["cache.path"]
43        self.local_cache = local_cache
44        self.connection = connection
[1734]45
[1532]46    def cache_store(self):
47        caching.touch_path(self.local_cache)
48        return caching.Sqlite3Store(os.path.join(self.local_cache,
49                                                 "pathway_store.sqlite3"))
[1734]50
[1532]51    def _open_last_modified_store(self):
52        caching.touch_dir(self.local_cache)
53        return caching.Sqlite3Store(os.path.join(self.local_cache,
[1543]54                                                 "last_modified.sqlite3"))
[1734]55
[1532]56    def _get_kgml(self):
57        """ Return an open kgml file for the pathway.
58        """
[1604]59        from datetime import datetime, timedelta
60        valid = False
[1734]61        local_filename = os.path.join(self.local_cache,
62                                      self.pathway_id + ".xml")
[1604]63        if os.path.exists(local_filename):
64            mtime = os.stat(local_filename).st_mtime
65            mtime = datetime.fromtimestamp(mtime)
66            now = datetime.now()
67            if conf.params["cache.invalidate"] == "always":
68                valid = False
69            elif conf.params["cache.invalidate"] == "session":
70                valid = (now - mtime) < (now - caching._SESSION_START)
71            elif conf.params["cache.invalidate"] == "daily":
72                valid = (now - mtime) < timedelta(1)
73            elif conf.params["cache.invalidate"] == "weekly":
74                valid = (now - mtime) < timedelta(7)
75            else:
76                valid = False
[1734]77
[1604]78        if not valid:
[1532]79            url = self.KGML_URL_FORMAT.format(pathway_id=self.pathway_id)
80            s = urllib2.urlopen(url)
81            contents = s.read()
[1734]82
[1532]83            with open(local_filename, "wb") as f:
84                f.write(contents)
[1734]85
[1532]86        return open(local_filename, "rb")
[1734]87
[1532]88    def _get_image_filename(self):
89        """ Return a filename of a local copy of the pathway image
90        """
91        # TODO: keep-alive (using httplib if it supports it)
92        # better to move all code to use requests package
[1734]93
[1532]94        url = str(self.image)
[1734]95
96        local_filename = os.path.join(self.local_cache,
97                                      self.pathway_id + ".png")
98
[1532]99        if not os.path.exists(local_filename):
100            response = urllib2.urlopen(url)
101            modified_since = response.headers.get("last-modified")
102            image = response.read()
103        else:
104            request = urllib2.Request(url)
105            with closing(self._open_last_modified_store()) as store:
106                modified_since = store.get(url, None)
[1734]107
[1532]108            request.add_header("If-Modified-Since", modified_since)
109            try:
110                response = urllib2.urlopen(request)
111            except urllib2.HTTPError, ex:
112                if ex.code == 304:
113                    return local_filename
114                else:
115                    raise
116            modified_since = response.headers.get("last-modified")
117            image = response.read()
[1734]118
[1532]119        with open(local_filename, "wb") as f:
120            f.write(image)
[1734]121
[1532]122        with closing(self._open_last_modified_store()) as store:
123            store[url] = modified_since
[1734]124
125        return local_filename
126
[1532]127    def _local_kgml_filename(self):
128        """ Return the local kgml xml filename for the pathway.
129        """
[1734]130        local_filename = os.path.join(self.local_cache,
131                                      self.pathway_id + ".xml")
[1532]132        return local_filename
[1734]133
[1532]134    class entry(object):
135        def __init__(self, dom_element):
136            self.__dict__.update(dom_element.attributes.items())
137            self.graphics = ()
138            self.components = []
139            self.graphics = dict(dom_element.getElementsByTagName("graphics")[0].attributes.items())
140            self.components = [node.getAttribute("id") for node in dom_element.getElementsByTagName("component")]
[1734]141
[1532]142    class reaction(object):
143        def __init__(self, dom_element):
144            self.__dict__.update(dom_element.attributes.items())
145            self.substrates = [node.getAttribute("name") for node in dom_element.getElementsByTagName("substrate")]
146            self.products = [node.getAttribute("name") for node in dom_element.getElementsByTagName("product")]
[1734]147
[1532]148    class relation(object):
149        def __init__(self, dom_element):
150            self.__dict__.update(dom_element.attributes.items())
151            self.subtypes = [node.attributes.items() for node in dom_element.getElementsByTagName("subtype")]
[1734]152
[1532]153    @cached_method
154    def pathway_attributes(self):
155        return dict(self.pathway_dom().attributes.items())
[1734]156
[1532]157    @property
158    def name(self):
159        return self.pathway_attributes().get("name")
[1734]160
[1532]161    @property
162    def org(self):
163        return self.pathway_attributes().get("org")
[1734]164
[1532]165    @property
166    def number(self):
167        return self.pathway_attributes().get("number")
[1734]168
169    @property
[1532]170    def title(self):
171        return self.pathway_attributes().get("title")
[1734]172
[1532]173    @property
174    def image(self):
175        return self.pathway_attributes().get("image")
[1734]176
[1532]177    @property
178    def link(self):
179        return self.pathway_attributes().get("link")
[1734]180
[1532]181    @cached_method
182    def pathway_dom(self):
183        try:
184            return minidom.parse(self._get_kgml()).getElementsByTagName("pathway")[0]
185        except xml.parsers.expat.ExpatError:
186            # TODO: Should delete the cached xml file.
187            return None
[1734]188
[1532]189    @cached_method
190    def entries(self):
191        dom = self.pathway_dom()
192        if dom:
193            return [self.entry(e) for e in dom.getElementsByTagName("entry")]
194        else:
195            return []
[1734]196
[1532]197    entrys = deprecated_attribute("entrys", "entries")
[1734]198
[1532]199    @cached_method
200    def reactions(self):
201        dom = self.pathway_dom()
202        if dom:
203            return [self.reaction(e) for e in dom.getElementsByTagName("reaction")]
204        else:
205            return []
[1734]206
[1532]207    @cached_method
208    def relations(self):
209        dom = self.pathway_dom()
210        if dom:
211            return [self.relation(e) for e in dom.getElementsByTagName("relation")]
212        else:
213            return []
[1734]214
[1532]215    def __iter__(self):
216        """ Iterate over all elements in the pathway
217        """
218        return iter(self.all_elements())
[1734]219
[1532]220    def __contains__(self, element):
221        """ Retrurn true if element in the pathway
222        """
223        return element in self.all_elements()
[1734]224
[1532]225    @classmethod
226    def split_pathway_id(cls, id):
227        path, id = id.split(":") if ":" in id else ("path", id)
228        org, id = id[:-5], id[-5:]
[1734]229        return path, org, id
230
[1532]231    @cached_method
232    def all_elements(self):
233        """ Return all elements
234        """
[1734]235        return reduce(list.__add__,
236                      [self.genes(), self.compounds(),
237                       self.enzmes(), self.reactions()],
238                      [])
239
[1532]240    def _get_entries_by_type(self, type):
[1734]241        return sorted(reduce(set.union,
242                             [entry.name.split() for entry in self.entries()
243                              if entry.type == type],
244                             set()))
245
[1532]246    @cached_method
247    def genes(self):
248        """ Return all genes on the pathway
249        """
250        return self._get_entries_by_type("gene")
[1734]251
[1532]252    @cached_method
253    def compounds(self):
254        """ Return all compounds on the pathway
255        """
256        return self._get_entries_by_type("compound")
[1734]257
[1532]258    @cached_method
259    def enzymes(self):
260        """ Return all enzymes on the pathway
261        """
262        return self._get_entries_by_type("enzyme")
[1734]263
[1532]264    @cached_method
265    def orthologs(self):
266        """ Return all orthologs on the pathway
267        """
268        return self._get_entries_by_type("ortholog")
[1734]269
[1532]270    @cached_method
271    def maps(self):
272        """ Return all linked maps on the pathway
273        """
274        return self._get_entries_by_type("map")
[1734]275
[1532]276    @cached_method
277    def groups(self):
278        """ Return all groups on the pathway
279        """
280        return self._get_entries_by_type("ortholog")
[1734]281
[1532]282    def get_image(self):
283        """ Return an image of the pathway
284        """
285        return self._get_image_filename()
[1733]286
[1532]287    @classmethod
288    def list(cls, organism):
289        kegg = api.CachedKeggApi()
290        return kegg.list_pathways(organism)
Note: See TracBrowser for help on using the repository browser.