source: orange-bioinformatics/_bioinformatics/obiKEGG/pathway.py @ 1734:91d14dd2cf0e

Revision 1734:91d14dd2cf0e, 9.2 KB checked in by Ales Erjavec <ales.erjavec@…>, 14 months ago (diff)

obiKEGG code style fixes.

Line 
1"""
2KEGG Pathway (from kgml file)
3
4"""
5from __future__ import absolute_import
6
7import os
8import urllib2
9
10import xml.parsers
11from xml.dom import minidom
12
13from contextlib import closing
14
15from Orange.utils import deprecated_attribute
16
17from . import conf
18from . import caching
19from . import api
20
21
22def cached_method(func, cache_name="_cached_method_cache", store=None):
23    def wrapper(self, *args, **kwargs):
24        sig = (func.__name__,) + args + tuple(sorted(kwargs.items()))
25        if not hasattr(self, cache_name):
26            setattr(self, cache_name, store() if store is not None else {})
27        if sig not in getattr(self, cache_name):
28            getattr(self, cache_name)[sig] = func(self, *args, **kwargs)
29        return getattr(self, cache_name)[sig]
30    return wrapper
31
32
33class Pathway(object):
34    KGML_URL_FORMAT = "http://www.genome.jp/kegg-bin/download?entry={pathway_id}&format=kgml"
35
36    def __init__(self, pathway_id, local_cache=None, connection=None):
37        if pathway_id.startswith("path:"):
38            _, pathway_id = pathway_id.split(":", 1)
39
40        self.pathway_id = pathway_id
41        if local_cache is None:
42            local_cache = conf.params["cache.path"]
43        self.local_cache = local_cache
44        self.connection = connection
45
46    def cache_store(self):
47        caching.touch_path(self.local_cache)
48        return caching.Sqlite3Store(os.path.join(self.local_cache,
49                                                 "pathway_store.sqlite3"))
50
51    def _open_last_modified_store(self):
52        caching.touch_dir(self.local_cache)
53        return caching.Sqlite3Store(os.path.join(self.local_cache,
54                                                 "last_modified.sqlite3"))
55
56    def _get_kgml(self):
57        """ Return an open kgml file for the pathway.
58        """
59        from datetime import datetime, timedelta
60        valid = False
61        local_filename = os.path.join(self.local_cache,
62                                      self.pathway_id + ".xml")
63        if os.path.exists(local_filename):
64            mtime = os.stat(local_filename).st_mtime
65            mtime = datetime.fromtimestamp(mtime)
66            now = datetime.now()
67            if conf.params["cache.invalidate"] == "always":
68                valid = False
69            elif conf.params["cache.invalidate"] == "session":
70                valid = (now - mtime) < (now - caching._SESSION_START)
71            elif conf.params["cache.invalidate"] == "daily":
72                valid = (now - mtime) < timedelta(1)
73            elif conf.params["cache.invalidate"] == "weekly":
74                valid = (now - mtime) < timedelta(7)
75            else:
76                valid = False
77
78        if not valid:
79            url = self.KGML_URL_FORMAT.format(pathway_id=self.pathway_id)
80            s = urllib2.urlopen(url)
81            contents = s.read()
82
83            with open(local_filename, "wb") as f:
84                f.write(contents)
85
86        return open(local_filename, "rb")
87
88    def _get_image_filename(self):
89        """ Return a filename of a local copy of the pathway image
90        """
91        # TODO: keep-alive (using httplib if it supports it)
92        # better to move all code to use requests package
93
94        url = str(self.image)
95
96        local_filename = os.path.join(self.local_cache,
97                                      self.pathway_id + ".png")
98
99        if not os.path.exists(local_filename):
100            response = urllib2.urlopen(url)
101            modified_since = response.headers.get("last-modified")
102            image = response.read()
103        else:
104            request = urllib2.Request(url)
105            with closing(self._open_last_modified_store()) as store:
106                modified_since = store.get(url, None)
107
108            request.add_header("If-Modified-Since", modified_since)
109            try:
110                response = urllib2.urlopen(request)
111            except urllib2.HTTPError, ex:
112                if ex.code == 304:
113                    return local_filename
114                else:
115                    raise
116            modified_since = response.headers.get("last-modified")
117            image = response.read()
118
119        with open(local_filename, "wb") as f:
120            f.write(image)
121
122        with closing(self._open_last_modified_store()) as store:
123            store[url] = modified_since
124
125        return local_filename
126
127    def _local_kgml_filename(self):
128        """ Return the local kgml xml filename for the pathway.
129        """
130        local_filename = os.path.join(self.local_cache,
131                                      self.pathway_id + ".xml")
132        return local_filename
133
134    class entry(object):
135        def __init__(self, dom_element):
136            self.__dict__.update(dom_element.attributes.items())
137            self.graphics = ()
138            self.components = []
139            self.graphics = dict(dom_element.getElementsByTagName("graphics")[0].attributes.items())
140            self.components = [node.getAttribute("id") for node in dom_element.getElementsByTagName("component")]
141
142    class reaction(object):
143        def __init__(self, dom_element):
144            self.__dict__.update(dom_element.attributes.items())
145            self.substrates = [node.getAttribute("name") for node in dom_element.getElementsByTagName("substrate")]
146            self.products = [node.getAttribute("name") for node in dom_element.getElementsByTagName("product")]
147
148    class relation(object):
149        def __init__(self, dom_element):
150            self.__dict__.update(dom_element.attributes.items())
151            self.subtypes = [node.attributes.items() for node in dom_element.getElementsByTagName("subtype")]
152
153    @cached_method
154    def pathway_attributes(self):
155        return dict(self.pathway_dom().attributes.items())
156
157    @property
158    def name(self):
159        return self.pathway_attributes().get("name")
160
161    @property
162    def org(self):
163        return self.pathway_attributes().get("org")
164
165    @property
166    def number(self):
167        return self.pathway_attributes().get("number")
168
169    @property
170    def title(self):
171        return self.pathway_attributes().get("title")
172
173    @property
174    def image(self):
175        return self.pathway_attributes().get("image")
176
177    @property
178    def link(self):
179        return self.pathway_attributes().get("link")
180
181    @cached_method
182    def pathway_dom(self):
183        try:
184            return minidom.parse(self._get_kgml()).getElementsByTagName("pathway")[0]
185        except xml.parsers.expat.ExpatError:
186            # TODO: Should delete the cached xml file.
187            return None
188
189    @cached_method
190    def entries(self):
191        dom = self.pathway_dom()
192        if dom:
193            return [self.entry(e) for e in dom.getElementsByTagName("entry")]
194        else:
195            return []
196
197    entrys = deprecated_attribute("entrys", "entries")
198
199    @cached_method
200    def reactions(self):
201        dom = self.pathway_dom()
202        if dom:
203            return [self.reaction(e) for e in dom.getElementsByTagName("reaction")]
204        else:
205            return []
206
207    @cached_method
208    def relations(self):
209        dom = self.pathway_dom()
210        if dom:
211            return [self.relation(e) for e in dom.getElementsByTagName("relation")]
212        else:
213            return []
214
215    def __iter__(self):
216        """ Iterate over all elements in the pathway
217        """
218        return iter(self.all_elements())
219
220    def __contains__(self, element):
221        """ Retrurn true if element in the pathway
222        """
223        return element in self.all_elements()
224
225    @classmethod
226    def split_pathway_id(cls, id):
227        path, id = id.split(":") if ":" in id else ("path", id)
228        org, id = id[:-5], id[-5:]
229        return path, org, id
230
231    @cached_method
232    def all_elements(self):
233        """ Return all elements
234        """
235        return reduce(list.__add__,
236                      [self.genes(), self.compounds(),
237                       self.enzmes(), self.reactions()],
238                      [])
239
240    def _get_entries_by_type(self, type):
241        return sorted(reduce(set.union,
242                             [entry.name.split() for entry in self.entries()
243                              if entry.type == type],
244                             set()))
245
246    @cached_method
247    def genes(self):
248        """ Return all genes on the pathway
249        """
250        return self._get_entries_by_type("gene")
251
252    @cached_method
253    def compounds(self):
254        """ Return all compounds on the pathway
255        """
256        return self._get_entries_by_type("compound")
257
258    @cached_method
259    def enzymes(self):
260        """ Return all enzymes on the pathway
261        """
262        return self._get_entries_by_type("enzyme")
263
264    @cached_method
265    def orthologs(self):
266        """ Return all orthologs on the pathway
267        """
268        return self._get_entries_by_type("ortholog")
269
270    @cached_method
271    def maps(self):
272        """ Return all linked maps on the pathway
273        """
274        return self._get_entries_by_type("map")
275
276    @cached_method
277    def groups(self):
278        """ Return all groups on the pathway
279        """
280        return self._get_entries_by_type("ortholog")
281
282    def get_image(self):
283        """ Return an image of the pathway
284        """
285        return self._get_image_filename()
286
287    @classmethod
288    def list(cls, organism):
289        kegg = api.CachedKeggApi()
290        return kegg.list_pathways(organism)
Note: See TracBrowser for help on using the repository browser.