Skip to content

Commit

Permalink
Add detailed docstrings and usage examples to Ersilia'a serve modules (
Browse files Browse the repository at this point in the history
…#1439)

* Merge sample command with the example command (#1422)

* Merge sample command with the example command

* Fix example command usage

* Generalize Standard Run  (#1411)

* Modify header calculation to choose from predefined example output file or standard example output file

* Remove the readiness function from SCRA because it is redundant, since those checks are also performed by the amenable function

* Remove unused method

* Make csv serialization work for any kind of model api response

* Remove the standard flag from the CLI since it is now the default run

* Update tests

* Unnecessary files removed

* Unnecessary files removed

* Unnecessary files removed

* Unnecessary files removed

* Unnecessary files removed

* Some import cleanup

* Add detailed docstrings and usage examples to Ersilia'a serve modules

* Few code fixes

* Few code fixes

* Few code fixex

* Few code fixes for request change

* Few code fixes for request change

---------

Co-authored-by: Dhanshree Arora <DhanshreeA@users.noreply.github.com>
  • Loading branch information
Abellegese and DhanshreeA authored Dec 16, 2024
1 parent 3d161cc commit 21b8315
Show file tree
Hide file tree
Showing 6 changed files with 1,183 additions and 121 deletions.
224 changes: 133 additions & 91 deletions ersilia/serve/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,30 @@


class Api(object):
"""
Class to interact with the API for a given model.
Parameters
----------
model_id : str
The ID of the model.
url : str
The URL of the API.
api_name : str
The name of the API.
save_to_lake : bool
Whether to save results to the data lake.
config_json : dict
Configuration in JSON format.
Examples
--------
.. code-block:: python
api = Api(model_id='eosxxxx', url='http://0.0.0.0:25512/', api_name='run', save_to_lake=True, config_json={})
result = api.post(input='input.json', output='output.csv', batch_size=10)
"""

def __init__(self, model_id, url, api_name, save_to_lake, config_json):
self.config_json = config_json
self.model_id = model_id
Expand All @@ -31,7 +55,7 @@ def __init__(self, model_id, url, api_name, save_to_lake, config_json):
model_id=model_id, api_name=api_name, config_json=config_json
)
self.save_to_lake = save_to_lake
if url[-1] == "/":
if (url[-1] == "/"):
self.url = url[:-1]
else:
self.url = url
Expand Down Expand Up @@ -99,7 +123,6 @@ def _do_post(self, input, output):
def _post(self, input, output):
result = self._do_post(input, output)
if result is None and self._batch_size > 1 and len(input) > 1:
# if batch predictions didn't work, do one by one
self.logger.warning(
"Batch prediction didn't seem to work. Doing predictions one by one..."
)
Expand All @@ -114,12 +137,66 @@ def _post(self, input, output):
result = json.dumps(result, indent=4)
result = self.__result_returner(result, output)
if result is None and len(input) == 1:
# if the only one prediction did not work, return empty
result = [{"input": input[0], "output": self._empty_output}]
result = json.dumps(result, indent=4)
result = self.__result_returner(result, output)
return result

def post(self, input, output, batch_size):
"""
Post input data to the API and get the result.
Parameters
----------
input : str
The input data file or data.
output : str
The output data file.
batch_size : int
The batch size for processing.
Yields
------
dict
The result of the API call.
"""
if self._is_input_file(input):
if not os.path.exists(input):
raise InputFileNotFoundError(file_name=input)
self.logger.debug("Posting to {0}".format(self.api_name))
self.logger.debug("Batch size {0}".format(batch_size))
unique_input, mapping = self._unique_input(input)
results_ = {}
for res in self.post_unique_input(
input=unique_input, output=None, batch_size=batch_size
):
for i in mapping[res["input"]["key"]]:
results_[i] = res
self.logger.debug("Done with unique posting")
sorted_idxs = sorted(results_.keys())
results = [results_[i] for i in sorted_idxs]
if output is not None:
results = json.dumps(results)
self.output_adapter.adapt(
results, output, model_id=self.model_id, api_name=self.api_name
)
for o in [output]:
yield o
else:
for result in results:
yield result

def meta(self):
"""
Get metadata from the output adapter.
Returns
-------
dict
Metadata information.
"""
return self.output_adapter.meta()

def post_only_calculations(self, input, output, batch_size):
self._batch_size = batch_size
if output is not None:
Expand All @@ -144,11 +221,6 @@ def post_only_calculations(self, input, output, batch_size):
for r in result:
yield r

def _post_reads(self, input, output):
results = self.lake.read(input)
results = json.dumps(results, indent=4)
return self.__result_returner(results, output)

def post_only_reads(self, input, output, batch_size):
self._batch_size = batch_size
if output is not None:
Expand All @@ -173,52 +245,6 @@ def post_only_reads(self, input, output, batch_size):
for r in result:
yield r

def _write_done_todo_file(self, cur_idx, filename, data):
with open(filename, "a+") as f:
writer = csv.writer(f)
for d in data:
idx = cur_idx + d["idx"]
key = d["key"]
inp = d["input"]
txt = d["text"]
writer.writerow([idx, key, inp, txt])

def _process_done_todo_results(
self, done_input, todo_input, done_output, todo_output
):
mapping = {}
if done_output is not None:
with open(done_input, "r") as f:
reader = csv.reader(f)
for i, r in enumerate(reader):
mapping[int(r[0])] = (i, True)
with open(done_output, "r") as f:
done_output_data = json.load(f)
else:
done_output_data = {}
if todo_output is not None:
with open(todo_input, "r") as f:
reader = csv.reader(f)
for i, r in enumerate(reader):
mapping[int(r[0])] = (i, False)
with open(todo_output, "r") as f:
todo_output_data = json.load(f)
else:
todo_output_data = {}
for j in range(len(mapping)):
i, is_done = mapping[j]
if is_done:
yield done_output_data[i]
else:
yield todo_output_data[i]

@staticmethod
def __is_empty_file(filename):
if os.stat(filename).st_size == 0:
return True
else:
return False

def post_amenable_to_h5(self, input, output, batch_size):
self.logger.debug(
"Checking for already available calculations in the data lake"
Expand Down Expand Up @@ -278,16 +304,6 @@ def post_amenable_to_h5(self, input, output, batch_size):
for result in results:
yield result

def _unique_input(self, input):
mapping = collections.defaultdict(list)
unique_input = []
for i, inp in enumerate(self.input_adapter.adapt_one_by_one(input)):
key = inp["key"]
if key not in mapping:
unique_input += [inp]
mapping[key] += [i]
return unique_input, mapping

def post_unique_input(self, input, output, batch_size):
schema = ApiSchema(model_id=self.model_id, config_json=self.config_json)
if (
Expand All @@ -313,32 +329,58 @@ def _is_input_file(self, input):
return True
return False

def post(self, input, output, batch_size):
if self._is_input_file(input):
if not os.path.exists(input):
raise InputFileNotFoundError(file_name=input)
self.logger.debug("Posting to {0}".format(self.api_name))
self.logger.debug("Batch size {0}".format(batch_size))
unique_input, mapping = self._unique_input(input)
results_ = {}
for res in self.post_unique_input(
input=unique_input, output=None, batch_size=batch_size
):
for i in mapping[res["input"]["key"]]:
results_[i] = res
self.logger.debug("Done with unique posting")
sorted_idxs = sorted(results_.keys())
results = [results_[i] for i in sorted_idxs]
if output is not None:
results = json.dumps(results)
self.output_adapter.adapt(
results, output, model_id=self.model_id, api_name=self.api_name
)
for o in [output]:
yield o
def _unique_input(self, input):
mapping = collections.defaultdict(list)
unique_input = []
for i, inp in enumerate(self.input_adapter.adapt_one_by_one(input)):
key = inp["key"]
if key not in mapping:
unique_input += [inp]
mapping[key] += [i]
return unique_input, mapping

def _write_done_todo_file(self, cur_idx, filename, data):
with open(filename, "a+") as f:
writer = csv.writer(f)
for d in data:
idx = cur_idx + d["idx"]
key = d["key"]
inp = d["input"]
txt = d["text"]
writer.writerow([idx, key, inp, txt])

def _process_done_todo_results(
self, done_input, todo_input, done_output, todo_output
):
mapping = {}
if done_output is not None:
with open(done_input, "r") as f:
reader = csv.reader(f)
for i, r in enumerate(reader):
mapping[int(r[0])] = (i, True)
with open(done_output, "r") as f:
done_output_data = json.load(f)
else:
for result in results:
yield result
done_output_data = {}
if todo_output is not None:
with open(todo_input, "r") as f:
reader = csv.reader(f)
for i, r in enumerate(reader):
mapping[int(r[0])] = (i, False)
with open(todo_output, "r") as f:
todo_output_data = json.load(f)
else:
todo_output_data = {}
for j in range(len(mapping)):
i, is_done = mapping[j]
if is_done:
yield done_output_data[i]
else:
yield todo_output_data[i]

def meta(self):
return self.output_adapter.meta()
@staticmethod
def __is_empty_file(filename):
if os.stat(filename).st_size == 0:
return True
else:
return False
74 changes: 74 additions & 0 deletions ersilia/serve/autoservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,37 @@


class AutoService(ErsiliaBase):
"""
Class to automatically determine and manage the service for a given model.
This class is responsible for selecting the appropriate service to run a model based on its configuration.
A "service" in this context refers to the environment or platform where the model will be executed, such as
a system bundle, virtual environment, Conda environment, Docker image, or a hosted service.
The class can automatically decide which service to use based on the availability and configuration of the model.
It also provides methods to start, stop, and manage the service, as well as to interact with the model's APIs.
Parameters
----------
model_id : str
The ID of the model.
service_class : str, optional
The class of the service.
config_json : dict, optional
Configuration in JSON format.
preferred_port : int, optional
The preferred port for the service.
url : str, optional
The URL of the service.
Examples
--------
.. code-block:: python
service = AutoService(model_id='model123', config_json={})
service.serve()
"""

def __init__(
self,
model_id,
Expand Down Expand Up @@ -263,6 +294,14 @@ def _service_class_loader(self, service_class):
raise Exception()

def get_apis(self):
"""
Get the list of APIs available for the model.
Returns
-------
list
List of API names.
"""
apis = []
with open(self.apis_list, "r") as f:
for l in f:
Expand All @@ -271,12 +310,28 @@ def get_apis(self):
return sorted(apis)

def is_available(self):
"""
Check if the service is available.
Returns
-------
bool
True if the service is available, False otherwise.
"""
if self.service is None:
return False
else:
return True

def is_served(self):
"""
Check if the service is currently being served.
Returns
-------
bool
True if the service is being served, False otherwise.
"""
tmp_file = tmp_pid_file(self.model_id)
if os.path.exists(tmp_file):
return True
Expand Down Expand Up @@ -363,6 +418,25 @@ def close(self):
def api(
self, api_name, input, output=DEFAULT_OUTPUT, batch_size=DEFAULT_BATCH_SIZE
):
"""
Call an API for the model.
Parameters
----------
api_name : str
The name of the API.
input : str
The input data file or data.
output : str, optional
The output data file.
batch_size : int, optional
The batch size for processing.
Yields
------
dict
The result of the API call.
"""
self.logger.debug("API: {0}".format(api_name))
self.logger.debug("MODEL ID: {0}".format(self.model_id))
self.logger.debug("SERVICE URL: {0}".format(self.service.url))
Expand Down
Loading

0 comments on commit 21b8315

Please sign in to comment.