Skip to content

ALGO-62 Integrated mlops agent #16

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions adk/ADK.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
import os
import sys
import Algorithmia
import os
import subprocess

from adk.io import create_exception, format_data, format_response
from adk.modeldata import ModelData
from adk.mlops import MLOps


class ADK(object):
Expand All @@ -17,6 +21,7 @@ def __init__(self, apply_func, load_func=None, client=None):
:param client: A Algorithmia Client instance that might be user defined,
and is used for interacting with a model manifest file; if defined.
"""
self.mlops = None
self.FIFO_PATH = "/tmp/algoout"

if client:
Expand All @@ -39,10 +44,8 @@ def __init__(self, apply_func, load_func=None, client=None):
self.load_result = None
self.loading_exception = None
self.manifest_path = "model_manifest.json"
self.model_data = self.init_manifest(self.manifest_path)

def init_manifest(self, path):
return ModelData(self.client, path)
self.mlops_path = "mlops.json"
self.model_data = ModelData(self.client, self.manifest_path)

def load(self):
try:
Expand Down Expand Up @@ -91,8 +94,18 @@ def write_to_pipe(self, payload, pprint=print):
def process_local(self, local_payload, pprint):
result = self.apply(local_payload)
self.write_to_pipe(result, pprint=pprint)

def mlops_init(self):
mlops_token = os.environ.get("DATAROBOT_MLOPS_API_TOKEN", None)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason you aren't using the same env var as the monitoring agent itself? This might make things simpler, unless you specifically don't want them to be the same. That would be just MLOPS_API_TOKEN.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, in the event that there are other MLOPS API tokens from different orgs; wanting to be very clear about what this token is used for so it's not confused with any algorithmia token

if mlops_token:
self.mlops = MLOps(mlops_token, self.mlops_path)
self.mlops.init()
else:
raise Exception("'DATAROBOT_MLOPS_API_TOKEN' was not found, please set to use mlops.")

def init(self, local_payload=None, pprint=print):
def init(self, local_payload=None, pprint=print, mlops=False):
if mlops and not self.is_local:
self.mlops_init()
self.load()
if self.is_local and local_payload is not None:
if self.loading_exception:
Expand Down
46 changes: 46 additions & 0 deletions adk/mlops.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import yaml
import json
import os
import subprocess


class MLOps(object):
spool_dir = "/tmp/ta"
agent_dir = "/opt/mlops-agent"
mlops_dir_name = "datarobot_mlops_package-8.1.2"

def __init__(self, api_token, path):
self.token = api_token
if os.path.exists(path):
with open(path) as f:
mlops_config = json.load(f)
else:
raise Exception("'mlops.json' file does not exist, but mlops was requested.")
if not os.path.exists(self.agent_dir):
raise Exception("environment is not configured for mlops.\nPlease select a valid mlops enabled environment.")
self.endpoint = mlops_config['datarobot_mlops_service_url']
self.model_id = mlops_config['model_id']
self.deployment_id = mlops_config['deployment_id']
self.mlops_name = mlops_config.get('mlops_dir_name', 'datarobot_mlops_package-8.1.2')

def init(self):
os.environ['MLOPS_DEPLOYMENT_ID'] = self.deployment_id
os.environ['MLOPS_MODEL_ID'] = self.model_id
os.environ['MLOPS_SPOOLER_TYPE'] = "FILESYSTEM"
os.environ['MLOPS_FILESYSTEM_DIRECTORY'] = self.spool_dir

with open(f'{self.agent_dir}/{self.mlops_dir_name}/conf/mlops.agent.conf.yaml') as f:
documents = yaml.load(f, Loader=yaml.FullLoader)
documents['mlopsUrl'] = self.endpoint
documents['apiToken'] = self.token
with open(f'{self.agent_dir}/{self.mlops_dir_name}/conf/mlops.agent.conf.yaml', 'w') as f:
yaml.dump(documents, f)

subprocess.call(f'{self.agent_dir}/{self.mlops_dir_name}/bin/start-agent.sh')
check = subprocess.Popen([f'{self.agent_dir}/{self.mlops_dir_name}/bin/status-agent.sh'], stdout=subprocess.PIPE)
output = check.stdout.readlines()[0]
check.terminate()
if b"DataRobot MLOps-Agent is running as a service." in output:
return True
else:
raise Exception(output)
6 changes: 3 additions & 3 deletions adk/modeldata.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,16 @@ def find_optional_model(self, file_name):
else:
self.models[file_name] = FileData(real_hash, local_data_path)


def get_manifest(self):
if os.path.exists(self.manifest_frozen_path):
with open(self.manifest_frozen_path) as f:
manifest_data = json.load(f)
if check_lock(manifest_data):
return manifest_data
else:
raise Exception("Manifest FreezeFile Tamper Detected; please use the CLI and 'algo freeze' to rebuild your "
"algorithm's freeze file.")
raise Exception(
"Manifest FreezeFile Tamper Detected; please use the CLI and 'algo freeze' to rebuild your "
"algorithm's freeze file.")
elif os.path.exists(self.manifest_reg_path):
with open(self.manifest_reg_path) as f:
manifest_data = json.load(f)
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
algorithmia>=1.7,<2
six
six
pyaml>=21.10,<21.11
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
author_email='support@algorithmia.com',
packages=['adk'],
install_requires=[
'pyaml>=21.10,<21.11',
'six',
],
include_package_data=True,
Expand Down
4 changes: 2 additions & 2 deletions tests/AdkTest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from adk import ADK

from adk.modeldata import ModelData

class ADKTest(ADK):
def __init__(self, apply_func, load_func=None, client=None, manifest_path="model_manifest.json.freeze"):
super(ADKTest, self).__init__(apply_func, load_func, client)
self.model_data = self.init_manifest(manifest_path)
self.model_data = ModelData(self.client, manifest_path)