Skip to content

Commit

Permalink
feat(function): add _try command for local run
Browse files Browse the repository at this point in the history
  • Loading branch information
mostaphaRoudsari committed Jan 4, 2021
1 parent e7d7b7f commit c248110
Showing 1 changed file with 60 additions and 13 deletions.
73 changes: 60 additions & 13 deletions queenbee_dsl/function/base.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import inspect
import os
import subprocess
import tempfile

from dataclasses import dataclass
from typing import Any, Dict, NamedTuple
from collections import namedtuple

from queenbee.plugin.function import Function as QBFunction
from queenbee.base.parser import parse_double_quotes_vars
from queenbee_local import _copy_artifacts

from ..common import camel_to_snake, _BaseClass

Expand Down Expand Up @@ -88,31 +91,75 @@ def _outputs(self) -> NamedTuple:

return self._cached_outputs

def run(self, inputs: Dict[str, Any], run_folder: str = None) -> str:
"""Run a function locally for testing.
def _try(self, inputs: Dict[str, Any], folder: str = None) -> str:
"""Try running a function locally for testing.
This method does not return the output values.
This method does not return the output values. See the folder to validate the
outputs.
Args:
inputs: A dictionary that maps input names to values
(e.g. {'input_one': 5, ...}).
run_folder: An optional folder to run the function. A temporary folder
folder: An optional folder to run the function. A temporary folder
will be created if this folder is not provided.
Returns:
str -- path to run_folder.
"""
func = self.queenbee
# check all the required inputs are provided
for inp in func.inputs:
name = inp.name.replace('-', '_')
if inp.required:
assert name in inputs, f'Required input "{name}" is missing from inputs.'
continue
# see if default value should be used
if name not in inputs:
inputs[name] = inp.default

dst = folder or tempfile.TemporaryDirectory().name

command = ' '.join(func.command.split())
inputs = func.inputs
print(command)
print(inputs)
# check all the inputs are provided

# copy artifacts to run_folder
## use queenbee-local commands
# execute the command
# subprocess.call()

refs = parse_double_quotes_vars(command)
command = command.replace('{{', '{').replace('}}', '}')
for ref in refs:
assert ref.startswith('inputs.'), \
'All referenced values must start with {{inputs followed with' \
f' variable name. Invalid referenced value: {ref}'
var = ref.replace('inputs.', '')
command = command.replace('{%s}' % ref, str(inputs[var]))

for art in func.artifact_inputs:
print(f"copying input artifact: {art.name}...")
name = art.name.replace('-', '_')
_copy_artifacts(inputs[name], os.path.join(dst, art.path))

cur_dir = os.getcwd()
os.chdir(dst)

print(f'command: {command}')

p = subprocess.Popen(
command, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=True, env=os.environ
)

stdout, stderr = p.communicate()

if p.returncode != 0 and stderr != b'':
raise RuntimeError(stderr.decode('utf-8'))

if stderr.decode('utf-8'):
print(stderr.decode('utf-8'))

if stdout.decode('utf-8'):
print(stdout.decode('utf-8'))

# change back to initial directory
os.chdir(cur_dir)

return dst


def command(func):
Expand Down

0 comments on commit c248110

Please sign in to comment.