source: orange-bioinformatics/obiKEGG2/pathway.py @ 1532:14a377419a09

Revision 1532:14a377419a09, 8.7 KB checked in by ales_erjavec, 2 years ago (diff)

Added KEGG interface using web services

Line 
1"""
2KEGG Pathway (from kgml file)
3
4"""
5from __future__ import absolute_import
6
7import os
8import urllib2
9
10import xml.dom
11import xml.parsers
12from xml.dom import minidom
13
14from contextlib import closing
15
16from . import conf
17from . import caching
18
19from Orange.misc import deprecated_attribute
20
21   
22def cached_method(func, cache_name="_cached_method_cache", store=None):
23    def wrapper(self, *args, **kwargs):
24        sig = (func.__name__,) + args + tuple(sorted(kwargs.items()))
25        if not hasattr(self, cache_name):
26            setattr(self, cache_name, store() if store is not None else {})
27        if sig not in getattr(self, cache_name):
28            getattr(self, cache_name)[sig] = func(self, *args, **kwargs)
29        return getattr(self, cache_name)[sig]
30    return wrapper
31
32class Pathway(object):
33    KGML_URL_FORMAT = "http://www.genome.jp/kegg-bin/download?entry={pathway_id}&format=kgml"
34   
35    """
36    TODO: Caching should be done using a Store interface.
37   
38    """
39    def __init__(self, pathway_id, local_cache=None, connection=None):
40        if pathway_id.startswith("path:"):
41            _, pathway_id = pathway_id.split(":", 1)
42           
43        self.pathway_id = pathway_id
44        if local_cache is None:
45            local_cache = conf.params["cache.path"]
46        self.local_cache = local_cache
47        self.connection = connection
48       
49    def cache_store(self):
50        caching.touch_path(self.local_cache)
51        return caching.Sqlite3Store(os.path.join(self.local_cache,
52                                                 "pathway_store.sqlite3"))
53       
54    def _open_last_modified_store(self):
55        import shelve
56        caching.touch_dir(self.local_cache)
57        return caching.Sqlite3Store(os.path.join(self.local_cache,
58                                                 "last-modified.sqlite3"))
59       
60    def _get_kgml(self):
61        """ Return an open kgml file for the pathway.
62        """
63        local_filename = os.path.join(self.local_cache, self.pathway_id + ".xml")
64       
65        if not os.path.exists(local_filename):
66            url = self.KGML_URL_FORMAT.format(pathway_id=self.pathway_id)
67            s = urllib2.urlopen(url)
68            contents = s.read()
69            with open(local_filename, "wb") as f:
70                f.write(contents)
71               
72        return open(local_filename, "rb")
73   
74    def _get_image_filename(self):
75        """ Return a filename of a local copy of the pathway image
76        """
77        # TODO: keep-alive (using httplib if it supports it)
78        # better to move all code to use requests package
79       
80        url = str(self.image)
81       
82        local_filename = os.path.join(self.local_cache, self.pathway_id + ".png")
83       
84        if not os.path.exists(local_filename):
85            response = urllib2.urlopen(url)
86            modified_since = response.headers.get("last-modified")
87            image = response.read()
88        else:
89            request = urllib2.Request(url)
90            modified_cache = os.path.join(self.local_cache, "http-last-modified-cache.store")
91            with closing(self._open_last_modified_store()) as store:
92                modified_since = store.get(url, None)
93               
94            request.add_header("If-Modified-Since", modified_since)
95            try:
96                response = urllib2.urlopen(request)
97            except urllib2.HTTPError, ex:
98                if ex.code == 304:
99                    return local_filename
100                else:
101                    raise
102            modified_since = response.headers.get("last-modified")
103            image = response.read()
104               
105        with open(local_filename, "wb") as f:
106            f.write(image)
107           
108        with closing(self._open_last_modified_store()) as store:
109            store[url] = modified_since
110           
111        return local_filename
112       
113    def _local_kgml_filename(self):
114        """ Return the local kgml xml filename for the pathway.
115        """
116        local_filename = os.path.join(self.local_cache, self.pathway_id + ".xml")
117        return local_filename
118       
119    class entry(object):
120        def __init__(self, dom_element):
121            self.__dict__.update(dom_element.attributes.items())
122            self.graphics = ()
123            self.components = []
124            self.graphics = dict(dom_element.getElementsByTagName("graphics")[0].attributes.items())
125            self.components = [node.getAttribute("id") for node in dom_element.getElementsByTagName("component")]
126           
127    class reaction(object):
128        def __init__(self, dom_element):
129            self.__dict__.update(dom_element.attributes.items())
130            self.substrates = [node.getAttribute("name") for node in dom_element.getElementsByTagName("substrate")]
131            self.products = [node.getAttribute("name") for node in dom_element.getElementsByTagName("product")]
132           
133    class relation(object):
134        def __init__(self, dom_element):
135            self.__dict__.update(dom_element.attributes.items())
136            self.subtypes = [node.attributes.items() for node in dom_element.getElementsByTagName("subtype")]
137       
138    @cached_method
139    def pathway_attributes(self):
140        return dict(self.pathway_dom().attributes.items())
141   
142    @property
143    def name(self):
144        return self.pathway_attributes().get("name")
145   
146    @property
147    def org(self):
148        return self.pathway_attributes().get("org")
149   
150    @property
151    def number(self):
152        return self.pathway_attributes().get("number")
153   
154    @property   
155    def title(self):
156        return self.pathway_attributes().get("title")
157   
158    @property
159    def image(self):
160        return self.pathway_attributes().get("image")
161   
162    @property
163    def link(self):
164        return self.pathway_attributes().get("link")
165   
166    @cached_method
167    def pathway_dom(self):
168        try:
169            return minidom.parse(self._get_kgml()).getElementsByTagName("pathway")[0]
170        except xml.parsers.expat.ExpatError:
171            # TODO: Should delete the cached xml file.
172            return None
173   
174    @cached_method
175    def entries(self):
176        dom = self.pathway_dom()
177        if dom:
178            return [self.entry(e) for e in dom.getElementsByTagName("entry")]
179        else:
180            return []
181   
182    entrys = deprecated_attribute("entrys", "entries")
183   
184    @cached_method
185    def reactions(self):
186        dom = self.pathway_dom()
187        if dom:
188            return [self.reaction(e) for e in dom.getElementsByTagName("reaction")]
189        else:
190            return []
191   
192    @cached_method
193    def relations(self):
194        dom = self.pathway_dom()
195        if dom:
196            return [self.relation(e) for e in dom.getElementsByTagName("relation")]
197        else:
198            return []
199   
200    def __iter__(self):
201        """ Iterate over all elements in the pathway
202        """
203        return iter(self.all_elements())
204   
205    def __contains__(self, element):
206        """ Retrurn true if element in the pathway
207        """
208        return element in self.all_elements()
209   
210    @classmethod
211    def split_pathway_id(cls, id):
212        path, id = id.split(":") if ":" in id else ("path", id)
213        org, id = id[:-5], id[-5:]
214        return path, org, id 
215   
216    @cached_method
217    def all_elements(self):
218        """ Return all elements
219        """
220        return reduce(list.__add__, [self.genes(), self.compounds(), self.enzmes(), self.reactions()], [])
221   
222    def _get_entries_by_type(self, type):
223        return reduce(set.union, [entry.name.split() for entry in self.entries() if entry.type == type], set())
224   
225    @cached_method
226    def genes(self):
227        """ Return all genes on the pathway
228        """
229        return self._get_entries_by_type("gene")
230   
231    @cached_method
232    def compounds(self):
233        """ Return all compounds on the pathway
234        """
235        return self._get_entries_by_type("compound")
236   
237    @cached_method
238    def enzymes(self):
239        """ Return all enzymes on the pathway
240        """
241        return self._get_entries_by_type("enzyme")
242   
243    @cached_method
244    def orthologs(self):
245        """ Return all orthologs on the pathway
246        """
247        return self._get_entries_by_type("ortholog")
248   
249    @cached_method
250    def maps(self):
251        """ Return all linked maps on the pathway
252        """
253        return self._get_entries_by_type("map")
254   
255    @cached_method
256    def groups(self):
257        """ Return all groups on the pathway
258        """
259        return self._get_entries_by_type("ortholog")
260   
261    def get_image(self):
262        """ Return an image of the pathway
263        """
264        return self._get_image_filename()
265   
266    @classmethod
267    def list(cls, organism):
268        from . import api
269        kegg = api.CachedKeggApi()
270        return kegg.list_pathways(organism)
271   
Note: See TracBrowser for help on using the repository browser.