source: orange-bioinformatics/_bioinformatics/obiKEGG2/entry/__init__.py @ 1636:10d234fdadb9

Revision 1636:10d234fdadb9, 4.8 KB checked in by mitar, 2 years ago (diff)

Restructuring because we will not be using namespaces.

Line 
1"""
2DBGET entry
3"""
4from __future__ import absolute_import
5
6__all__ = ["parser", "fields"]
7
8from collections import defaultdict
9
10from . import fields
11from .parser import DBGETEntryParser
12
13def entry_decorate(cls):
14    """ Decorate the DBEntry subclass with properties for acessing
15    the fields through the 'DBField._convert' interface
16     
17    """
18    reserved_names_map = {"class": "class_", "def": "def_"}
19    def construct_one(name):
20        def get(self):
21            field = getattr(self, name, None)
22            if field is not None:
23                return field._convert()
24            else:
25                return None
26        return property(get, doc=name)
27   
28    def construct_multiple(name):
29        def get(self):
30            field = getattr(self, name, None)
31            if field is not None:
32                return [f._convert() for f in field]
33            else:
34                return None
35        return property(get, doc=name)
36   
37    for name, field in cls.FIELDS:
38        name_lower = name.lower()
39        if not hasattr(cls, name_lower):
40            if name in cls.MULTIPLE_FIELDS:
41                prop = construct_multiple(name)
42            else:
43                prop = construct_one(name)
44            setattr(cls, reserved_names_map.get(name_lower, name_lower), prop)
45       
46    return cls
47
48class DBEntry(object):
49    """ A DBGET entry object.
50    """
51    FIELDS = [("ENTRY", fields.DBEntryField),
52              ]
53    MULTIPLE_FIELDS = []
54   
55    def __init__(self, text=None):
56        self._sections = {}
57        if text is not None:
58            self.parse(text)
59           
60    @property
61    def entry_key(self):
62        """ Primary entry key used for querying.
63        """
64        return self.entry.split(" ", 1)[0]
65   
66    def parse(self, text):
67        parser = DBGETEntryParser()
68        gen = parser.parse_string(text)
69        entry_start = 0
70        field_constructors = dict(self.FIELDS)
71        sections = defaultdict(list)
72        current = None
73        current_subfield = None
74        entry_fields = []
75        for (event, title, text) in gen:
76#            print event, title, text
77            if event == DBGETEntryParser.SECTION_START:
78                if title in field_constructors:
79                    ftype = field_constructors[title]
80                else:
81                    ftype = fields.DBSimpleField
82                current = ftype(text)
83                if current.TITLE is None:
84                    current.TITLE = title
85            elif event == DBGETEntryParser.SECTION_END:
86                entry_fields.append(current)
87                current = None
88            elif event == DBGETEntryParser.SUBSECTION_START:
89                current_subfield = fields.DBSimpleField(text)
90                current_subfield.TITLE = title
91                if not isinstance(current, fields.DBFieldWithSubsections):
92                    # Upgrade simple fields to FieldWithSubsection
93                    new = fields.DBFieldWithSubsections(current.text)
94                    new.TITLE = current.TITLE
95                    current = new
96                   
97            elif event == DBGETEntryParser.SUBSECTION_END:
98                current.subsections.append(current_subfield)
99                current_subfield = None
100            elif event == DBGETEntryParser.TEXT:
101                if current_subfield is not None:
102                    current_subfield.text += text
103                elif current is not None:
104                    current.text += text
105            elif event == DBGETEntryParser.ENTRY_END:
106                break
107       
108        self.fields = entry_fields
109        self._consolidate()
110       
111    def _consolidate(self):
112        """ Update mapping to field entries.
113        """
114        registered_fields = dict(self.FIELDS)
115        multiple_fields = set(self.MULTIPLE_FIELDS)
116       
117        for field in self.fields:
118            title = field.TITLE
119            if title not in registered_fields:
120                import warnings
121                warnings.warn("Nonregisterd field %r in %r" % \
122                                (title, type(self)))
123            title_lower = title.lower()
124            if title in multiple_fields:
125                if not hasattr(self, title):
126                    setattr(self, title, [])
127                getattr(self, title).append(field)
128            else:
129                setattr(self, title, field)
130       
131   
132    def __str__(self):
133        return self.format()
134   
135    def format(self, section_indent=12):
136        return "".join(f.format(section_indent)\
137                       for f in self.fields)
138       
139    def get(self, key, default=None):
140        raise NotImplementedError
141   
142        f = getattr(self, key, None)
143        if f is not None:
144            if key in self.MULTIPLE_FIELDS:
145                return [f.text for f in f]
146            else:
147                return f.text
148        else:
149            return None
Note: See TracBrowser for help on using the repository browser.