source: orange-bioinformatics/_bioinformatics/obiKEGG/pathway.py @ 1733:548d1187a29f

Revision 1733:548d1187a29f, 9.3 KB checked in by Ales Erjavec <ales.erjavec@…>, 14 months ago (diff)

Porting obiKEGG to use the new REST KEGG API.

Line 
1"""
2KEGG Pathway (from kgml file)
3
4"""
5from __future__ import absolute_import
6
7import os
8import urllib2
9
10import xml.dom
11import xml.parsers
12from xml.dom import minidom
13
14from contextlib import closing
15
16from . import conf
17from . import caching
18from . import api
19
20from Orange.utils import deprecated_attribute
21
22
23def cached_method(func, cache_name="_cached_method_cache", store=None):
24    def wrapper(self, *args, **kwargs):
25        sig = (func.__name__,) + args + tuple(sorted(kwargs.items()))
26        if not hasattr(self, cache_name):
27            setattr(self, cache_name, store() if store is not None else {})
28        if sig not in getattr(self, cache_name):
29            getattr(self, cache_name)[sig] = func(self, *args, **kwargs)
30        return getattr(self, cache_name)[sig]
31    return wrapper
32
33class Pathway(object):
34    KGML_URL_FORMAT = "http://www.genome.jp/kegg-bin/download?entry={pathway_id}&format=kgml"
35   
36    def __init__(self, pathway_id, local_cache=None, connection=None):
37        if pathway_id.startswith("path:"):
38            _, pathway_id = pathway_id.split(":", 1)
39           
40        self.pathway_id = pathway_id
41        if local_cache is None:
42            local_cache = conf.params["cache.path"]
43        self.local_cache = local_cache
44        self.connection = connection
45       
46    def cache_store(self):
47        caching.touch_path(self.local_cache)
48        return caching.Sqlite3Store(os.path.join(self.local_cache,
49                                                 "pathway_store.sqlite3"))
50       
51    def _open_last_modified_store(self):
52        import shelve
53        caching.touch_dir(self.local_cache)
54        return caching.Sqlite3Store(os.path.join(self.local_cache,
55                                                 "last_modified.sqlite3"))
56       
57    def _get_kgml(self):
58        """ Return an open kgml file for the pathway.
59        """
60        from datetime import datetime, timedelta
61        valid = False
62        local_filename = os.path.join(self.local_cache, self.pathway_id + ".xml")
63        if os.path.exists(local_filename):
64            mtime = os.stat(local_filename).st_mtime
65            mtime = datetime.fromtimestamp(mtime)
66            now = datetime.now()
67            if conf.params["cache.invalidate"] == "always":
68                valid = False
69            elif conf.params["cache.invalidate"] == "session":
70                valid = (now - mtime) < (now - caching._SESSION_START)
71            elif conf.params["cache.invalidate"] == "daily":
72                valid = (now - mtime) < timedelta(1)
73            elif conf.params["cache.invalidate"] == "weekly":
74                valid = (now - mtime) < timedelta(7)
75            else:
76                valid = False
77       
78        if not valid:
79            url = self.KGML_URL_FORMAT.format(pathway_id=self.pathway_id)
80            s = urllib2.urlopen(url)
81            contents = s.read()
82           
83            with open(local_filename, "wb") as f:
84                f.write(contents)
85               
86        return open(local_filename, "rb")
87   
88    def _get_image_filename(self):
89        """ Return a filename of a local copy of the pathway image
90        """
91        # TODO: keep-alive (using httplib if it supports it)
92        # better to move all code to use requests package
93       
94        url = str(self.image)
95       
96        local_filename = os.path.join(self.local_cache, self.pathway_id + ".png")
97       
98        if not os.path.exists(local_filename):
99            response = urllib2.urlopen(url)
100            modified_since = response.headers.get("last-modified")
101            image = response.read()
102        else:
103            request = urllib2.Request(url)
104            modified_cache = os.path.join(self.local_cache, "http-last-modified-cache.store")
105            with closing(self._open_last_modified_store()) as store:
106                modified_since = store.get(url, None)
107               
108            request.add_header("If-Modified-Since", modified_since)
109            try:
110                response = urllib2.urlopen(request)
111            except urllib2.HTTPError, ex:
112                if ex.code == 304:
113                    return local_filename
114                else:
115                    raise
116            modified_since = response.headers.get("last-modified")
117            image = response.read()
118               
119        with open(local_filename, "wb") as f:
120            f.write(image)
121           
122        with closing(self._open_last_modified_store()) as store:
123            store[url] = modified_since
124           
125        return local_filename
126       
127    def _local_kgml_filename(self):
128        """ Return the local kgml xml filename for the pathway.
129        """
130        local_filename = os.path.join(self.local_cache, self.pathway_id + ".xml")
131        return local_filename
132       
133    class entry(object):
134        def __init__(self, dom_element):
135            self.__dict__.update(dom_element.attributes.items())
136            self.graphics = ()
137            self.components = []
138            self.graphics = dict(dom_element.getElementsByTagName("graphics")[0].attributes.items())
139            self.components = [node.getAttribute("id") for node in dom_element.getElementsByTagName("component")]
140           
141    class reaction(object):
142        def __init__(self, dom_element):
143            self.__dict__.update(dom_element.attributes.items())
144            self.substrates = [node.getAttribute("name") for node in dom_element.getElementsByTagName("substrate")]
145            self.products = [node.getAttribute("name") for node in dom_element.getElementsByTagName("product")]
146           
147    class relation(object):
148        def __init__(self, dom_element):
149            self.__dict__.update(dom_element.attributes.items())
150            self.subtypes = [node.attributes.items() for node in dom_element.getElementsByTagName("subtype")]
151       
152    @cached_method
153    def pathway_attributes(self):
154        return dict(self.pathway_dom().attributes.items())
155   
156    @property
157    def name(self):
158        return self.pathway_attributes().get("name")
159   
160    @property
161    def org(self):
162        return self.pathway_attributes().get("org")
163   
164    @property
165    def number(self):
166        return self.pathway_attributes().get("number")
167   
168    @property   
169    def title(self):
170        return self.pathway_attributes().get("title")
171   
172    @property
173    def image(self):
174        return self.pathway_attributes().get("image")
175   
176    @property
177    def link(self):
178        return self.pathway_attributes().get("link")
179   
180    @cached_method
181    def pathway_dom(self):
182        try:
183            return minidom.parse(self._get_kgml()).getElementsByTagName("pathway")[0]
184        except xml.parsers.expat.ExpatError:
185            # TODO: Should delete the cached xml file.
186            return None
187   
188    @cached_method
189    def entries(self):
190        dom = self.pathway_dom()
191        if dom:
192            return [self.entry(e) for e in dom.getElementsByTagName("entry")]
193        else:
194            return []
195   
196    entrys = deprecated_attribute("entrys", "entries")
197   
198    @cached_method
199    def reactions(self):
200        dom = self.pathway_dom()
201        if dom:
202            return [self.reaction(e) for e in dom.getElementsByTagName("reaction")]
203        else:
204            return []
205   
206    @cached_method
207    def relations(self):
208        dom = self.pathway_dom()
209        if dom:
210            return [self.relation(e) for e in dom.getElementsByTagName("relation")]
211        else:
212            return []
213   
214    def __iter__(self):
215        """ Iterate over all elements in the pathway
216        """
217        return iter(self.all_elements())
218   
219    def __contains__(self, element):
220        """ Retrurn true if element in the pathway
221        """
222        return element in self.all_elements()
223   
224    @classmethod
225    def split_pathway_id(cls, id):
226        path, id = id.split(":") if ":" in id else ("path", id)
227        org, id = id[:-5], id[-5:]
228        return path, org, id 
229   
230    @cached_method
231    def all_elements(self):
232        """ Return all elements
233        """
234        return reduce(list.__add__, [self.genes(), self.compounds(), self.enzmes(), self.reactions()], [])
235   
236    def _get_entries_by_type(self, type):
237        return sorted(reduce(set.union, [entry.name.split() for entry in self.entries() if entry.type == type], set()))
238   
239    @cached_method
240    def genes(self):
241        """ Return all genes on the pathway
242        """
243        return self._get_entries_by_type("gene")
244   
245    @cached_method
246    def compounds(self):
247        """ Return all compounds on the pathway
248        """
249        return self._get_entries_by_type("compound")
250   
251    @cached_method
252    def enzymes(self):
253        """ Return all enzymes on the pathway
254        """
255        return self._get_entries_by_type("enzyme")
256   
257    @cached_method
258    def orthologs(self):
259        """ Return all orthologs on the pathway
260        """
261        return self._get_entries_by_type("ortholog")
262   
263    @cached_method
264    def maps(self):
265        """ Return all linked maps on the pathway
266        """
267        return self._get_entries_by_type("map")
268   
269    @cached_method
270    def groups(self):
271        """ Return all groups on the pathway
272        """
273        return self._get_entries_by_type("ortholog")
274   
275    def get_image(self):
276        """ Return an image of the pathway
277        """
278        return self._get_image_filename()
279
280    @classmethod
281    def list(cls, organism):
282        kegg = api.CachedKeggApi()
283        return kegg.list_pathways(organism)
Note: See TracBrowser for help on using the repository browser.