-
Notifications
You must be signed in to change notification settings - Fork 601
/
query_object.py
executable file
·108 lines (86 loc) · 2.88 KB
/
query_object.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import abc
import logging
import os
import json
import shutil
import cloudpickle as _cloudpickle
logger = logging.getLogger(__name__)
class QueryObject(abc.ABC):
"""
Derived class needs to implement the following interface:
* query() -- given input, return query result
* get_doc_string() -- returns documentation for the Query Object
"""
def __init__(self, description=""):
self.description = description
def get_dependencies(self):
"""All endpoints this endpoint depends on"""
return []
@abc.abstractmethod
def query(self, input):
"""execute query on the provided input"""
pass
@abc.abstractmethod
def get_doc_string(self):
"""Returns documentation for the query object
By default, this method returns the docstring for 'query' method
Derived class may overwrite this method to dynamically create docstring
"""
pass
def save(self, path):
""" Save query object to the given local path
Parameters
----------
path : str
The location to save the query object to
"""
if os.path.exists(path):
logger.warning(
f'Overwriting existing file "{path}" when saving query object'
)
rm_fn = os.remove if os.path.isfile(path) else shutil.rmtree
rm_fn(path)
self._save_local(path)
def _save_local(self, path):
"""Save current query object to local path
"""
try:
os.makedirs(path)
except OSError as e:
import errno
if e.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
raise
with open(os.path.join(path, "pickle_archive"), "wb") as f:
_cloudpickle.dump(self, f)
@classmethod
def load(cls, path):
""" Load query object from given path
"""
new_po = None
new_po = cls._load_local(path)
logger.info(f'Loaded query object "{type(new_po).__name__}" successfully')
return new_po
@classmethod
def _load_local(cls, path):
path = os.path.abspath(os.path.expanduser(path))
with open(os.path.join(path, "pickle_archive"), "rb") as f:
return _cloudpickle.load(f)
@classmethod
def _make_serializable(cls, result):
"""Convert a result from object query to python data structure that can
easily serialize over network
"""
try:
json.dumps(result)
except TypeError:
raise TypeError(
"Result from object query is not json serializable: " f"{result}"
)
return result
# Returns an array of dictionary that contains the methods and their
# corresponding schema information.
@abc.abstractmethod
def get_methods(self):
return None