source: orange-bioinformatics/obiKEGG2/pathway.py @ 1601:d3a65a71731e

Revision 1601:d3a65a71731e, 8.7 KB checked in by markotoplak, 2 years ago (diff)

Fixes supporting the move of (most of) Orange.misc to Orange.utils.

Line 
1"""
2KEGG Pathway (from kgml file)
3
4"""
5from __future__ import absolute_import
6
7import os
8import urllib2
9
10import xml.dom
11import xml.parsers
12from xml.dom import minidom
13
14from contextlib import closing
15
16from . import conf
17from . import caching
18
19from Orange.utils import deprecated_attribute
20   
21def cached_method(func, cache_name="_cached_method_cache", store=None):
22    def wrapper(self, *args, **kwargs):
23        sig = (func.__name__,) + args + tuple(sorted(kwargs.items()))
24        if not hasattr(self, cache_name):
25            setattr(self, cache_name, store() if store is not None else {})
26        if sig not in getattr(self, cache_name):
27            getattr(self, cache_name)[sig] = func(self, *args, **kwargs)
28        return getattr(self, cache_name)[sig]
29    return wrapper
30
31class Pathway(object):
32    KGML_URL_FORMAT = "http://www.genome.jp/kegg-bin/download?entry={pathway_id}&format=kgml"
33   
34    """
35    TODO: Caching should be done using a Store interface.
36   
37    """
38    def __init__(self, pathway_id, local_cache=None, connection=None):
39        if pathway_id.startswith("path:"):
40            _, pathway_id = pathway_id.split(":", 1)
41           
42        self.pathway_id = pathway_id
43        if local_cache is None:
44            local_cache = conf.params["cache.path"]
45        self.local_cache = local_cache
46        self.connection = connection
47       
48    def cache_store(self):
49        caching.touch_path(self.local_cache)
50        return caching.Sqlite3Store(os.path.join(self.local_cache,
51                                                 "pathway_store.sqlite3"))
52       
53    def _open_last_modified_store(self):
54        import shelve
55        caching.touch_dir(self.local_cache)
56        return caching.Sqlite3Store(os.path.join(self.local_cache,
57                                                 "last_modified.sqlite3"))
58       
59    def _get_kgml(self):
60        """ Return an open kgml file for the pathway.
61        """
62        local_filename = os.path.join(self.local_cache, self.pathway_id + ".xml")
63       
64        if not os.path.exists(local_filename):
65            url = self.KGML_URL_FORMAT.format(pathway_id=self.pathway_id)
66            s = urllib2.urlopen(url)
67            contents = s.read()
68            with open(local_filename, "wb") as f:
69                f.write(contents)
70               
71        return open(local_filename, "rb")
72   
73    def _get_image_filename(self):
74        """ Return a filename of a local copy of the pathway image
75        """
76        # TODO: keep-alive (using httplib if it supports it)
77        # better to move all code to use requests package
78       
79        url = str(self.image)
80       
81        local_filename = os.path.join(self.local_cache, self.pathway_id + ".png")
82       
83        if not os.path.exists(local_filename):
84            response = urllib2.urlopen(url)
85            modified_since = response.headers.get("last-modified")
86            image = response.read()
87        else:
88            request = urllib2.Request(url)
89            modified_cache = os.path.join(self.local_cache, "http-last-modified-cache.store")
90            with closing(self._open_last_modified_store()) as store:
91                modified_since = store.get(url, None)
92               
93            request.add_header("If-Modified-Since", modified_since)
94            try:
95                response = urllib2.urlopen(request)
96            except urllib2.HTTPError, ex:
97                if ex.code == 304:
98                    return local_filename
99                else:
100                    raise
101            modified_since = response.headers.get("last-modified")
102            image = response.read()
103               
104        with open(local_filename, "wb") as f:
105            f.write(image)
106           
107        with closing(self._open_last_modified_store()) as store:
108            store[url] = modified_since
109           
110        return local_filename
111       
112    def _local_kgml_filename(self):
113        """ Return the local kgml xml filename for the pathway.
114        """
115        local_filename = os.path.join(self.local_cache, self.pathway_id + ".xml")
116        return local_filename
117       
118    class entry(object):
119        def __init__(self, dom_element):
120            self.__dict__.update(dom_element.attributes.items())
121            self.graphics = ()
122            self.components = []
123            self.graphics = dict(dom_element.getElementsByTagName("graphics")[0].attributes.items())
124            self.components = [node.getAttribute("id") for node in dom_element.getElementsByTagName("component")]
125           
126    class reaction(object):
127        def __init__(self, dom_element):
128            self.__dict__.update(dom_element.attributes.items())
129            self.substrates = [node.getAttribute("name") for node in dom_element.getElementsByTagName("substrate")]
130            self.products = [node.getAttribute("name") for node in dom_element.getElementsByTagName("product")]
131           
132    class relation(object):
133        def __init__(self, dom_element):
134            self.__dict__.update(dom_element.attributes.items())
135            self.subtypes = [node.attributes.items() for node in dom_element.getElementsByTagName("subtype")]
136       
137    @cached_method
138    def pathway_attributes(self):
139        return dict(self.pathway_dom().attributes.items())
140   
141    @property
142    def name(self):
143        return self.pathway_attributes().get("name")
144   
145    @property
146    def org(self):
147        return self.pathway_attributes().get("org")
148   
149    @property
150    def number(self):
151        return self.pathway_attributes().get("number")
152   
153    @property   
154    def title(self):
155        return self.pathway_attributes().get("title")
156   
157    @property
158    def image(self):
159        return self.pathway_attributes().get("image")
160   
161    @property
162    def link(self):
163        return self.pathway_attributes().get("link")
164   
165    @cached_method
166    def pathway_dom(self):
167        try:
168            return minidom.parse(self._get_kgml()).getElementsByTagName("pathway")[0]
169        except xml.parsers.expat.ExpatError:
170            # TODO: Should delete the cached xml file.
171            return None
172   
173    @cached_method
174    def entries(self):
175        dom = self.pathway_dom()
176        if dom:
177            return [self.entry(e) for e in dom.getElementsByTagName("entry")]
178        else:
179            return []
180   
181    entrys = deprecated_attribute("entrys", "entries")
182   
183    @cached_method
184    def reactions(self):
185        dom = self.pathway_dom()
186        if dom:
187            return [self.reaction(e) for e in dom.getElementsByTagName("reaction")]
188        else:
189            return []
190   
191    @cached_method
192    def relations(self):
193        dom = self.pathway_dom()
194        if dom:
195            return [self.relation(e) for e in dom.getElementsByTagName("relation")]
196        else:
197            return []
198   
199    def __iter__(self):
200        """ Iterate over all elements in the pathway
201        """
202        return iter(self.all_elements())
203   
204    def __contains__(self, element):
205        """ Retrurn true if element in the pathway
206        """
207        return element in self.all_elements()
208   
209    @classmethod
210    def split_pathway_id(cls, id):
211        path, id = id.split(":") if ":" in id else ("path", id)
212        org, id = id[:-5], id[-5:]
213        return path, org, id 
214   
215    @cached_method
216    def all_elements(self):
217        """ Return all elements
218        """
219        return reduce(list.__add__, [self.genes(), self.compounds(), self.enzmes(), self.reactions()], [])
220   
221    def _get_entries_by_type(self, type):
222        return sorted(reduce(set.union, [entry.name.split() for entry in self.entries() if entry.type == type], set()))
223   
224    @cached_method
225    def genes(self):
226        """ Return all genes on the pathway
227        """
228        return self._get_entries_by_type("gene")
229   
230    @cached_method
231    def compounds(self):
232        """ Return all compounds on the pathway
233        """
234        return self._get_entries_by_type("compound")
235   
236    @cached_method
237    def enzymes(self):
238        """ Return all enzymes on the pathway
239        """
240        return self._get_entries_by_type("enzyme")
241   
242    @cached_method
243    def orthologs(self):
244        """ Return all orthologs on the pathway
245        """
246        return self._get_entries_by_type("ortholog")
247   
248    @cached_method
249    def maps(self):
250        """ Return all linked maps on the pathway
251        """
252        return self._get_entries_by_type("map")
253   
254    @cached_method
255    def groups(self):
256        """ Return all groups on the pathway
257        """
258        return self._get_entries_by_type("ortholog")
259   
260    def get_image(self):
261        """ Return an image of the pathway
262        """
263        return self._get_image_filename()
264   
265    @classmethod
266    def list(cls, organism):
267        from . import api
268        kegg = api.CachedKeggApi()
269        return kegg.list_pathways(organism)
270   
Note: See TracBrowser for help on using the repository browser.