Skip to content

Commit

Permalink
Merge pull request #1259 from facebookresearch/fix-remote-procedure-v…
Browse files Browse the repository at this point in the history
…alidation-ui

Updated UI validation and fixed error handling in remote procedures
  • Loading branch information
meta-paul authored Nov 6, 2024
2 parents 70bf630 + 0dd85b3 commit 53eaae0
Show file tree
Hide file tree
Showing 29 changed files with 154 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ mephisto form_composer config --verify
# Parameters that work together
mephisto form_composer config --directory /my/own/path/to/data/ --verify
mephisto form_composer config --directory /my/own/path/to/data/ --extrapolate-token-sets
mephisto form_composer config --directory /my/own/path/to/data/ --extrapolate-token-sets --copy-config-files
mephisto form_composer config --update-file-location-values "https://s3.amazonaws.com/..."
```

where
- `-d/--directory` - a **modifier** for all `form_composer config` command options that specifies the directory where all form JSON config files are located (if missing the default is `mephisto/generators/form_composer/data` directory)
- `-c/--copy-config-files` - a **modifier** for all `form_composer config` command options that allows to copy all data files from specified in `--directory` to the generator's data directory. It can be useful, when you have a bunch of different data directories
- `-v/--verify` - if truthy, validates all JSON configs currently present in the form builder config directory
- `-p/--permutate-sepatate-tokens` - if truthy, generates token sets values as all possible combinations of values of individual tokens
- `-f/--update-file-location-values S3_FOLDER_URL` - generates token values based on file names found within the specified S3 folder (see a separate section about this mode of running FormComposer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ mephisto video_annotator config --verify
# Parameters that work together
mephisto video_annotator config --directory /my/own/path/to/data/ --verify
mephisto video_annotator config --directory /my/own/path/to/data/ --extrapolate-token-sets
mephisto video_annotator config --directory /my/own/path/to/data/ --extrapolate-token-sets --copy-config-files
mephisto video_annotator config --update-file-location-values "https://s3.amazonaws.com/..."
```

where
- `-d/--directory` - a **modifier** for all `video_annotator config` command options that specifies the directory where all annotator JSON config files are located (if missing the default is `mephisto/generators/video_annotator/data` directory)
- `-c/--copy-config-files` - a **modifier** for all `form_composer config` command options that allows to copy all data files from specified in `--directory` to the generator's data directory. It can be useful, when you have a bunch of different data directories
- `-v/--verify` - if truthy, validates all JSON configs currently present in the annotator builder config directory
- `-p/--permutate-sepatate-tokens` - if truthy, generates token sets values as all possible combinations of values of individual tokens
- `-f/--update-file-location-values S3_FOLDER_URL` - generates token values based on file names found within the specified S3 folder (see a separate section about this mode of running VideoAnnotator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,35 @@
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

import json
import time
from typing import Any
from typing import Dict
from typing import TYPE_CHECKING
from typing import Union

from mephisto.abstractions.blueprint import TaskRunner
from mephisto.abstractions.blueprints.remote_procedure.remote_procedure_agent_state import (
RemoteProcedureAgentState,
)
from mephisto.data_model.agent import Agent, OnboardingAgent
import time
import json

from uuid import uuid4

from typing import ClassVar, List, Type, Any, Dict, Union, TYPE_CHECKING
from mephisto.data_model.agent import Agent
from mephisto.data_model.agent import OnboardingAgent
from mephisto.utils import http_status
from mephisto.utils.logger_core import get_logger

if TYPE_CHECKING:
from mephisto.data_model.task_run import TaskRun
from mephisto.abstractions.blueprint import AgentState
from mephisto.data_model.assignment import Assignment
from omegaconf import DictConfig
from mephisto.data_model.unit import Unit
from mephisto.abstractions.blueprints.remote_procedure.remote_procedure_blueprint import (
SharedRemoteProcedureTaskState,
)
from mephisto.data_model.task_run import TaskRun
from mephisto.data_model.unit import Unit
from omegaconf import DictConfig


THREAD_SHORT_SLEEP = 0.3

logger = get_logger(name=__name__)


class RemoteProcedureTaskRunner(TaskRunner):
"""
Expand All @@ -45,6 +49,7 @@ def __init__(
shared_state: "SharedRemoteProcedureTaskState",
):
super().__init__(task_run, args, shared_state)

# TODO load up the possible functions from the shared_state
self.is_concurrent = False # This task is 1 person w/ backend
self.function_registry = shared_state.function_registry
Expand All @@ -57,6 +62,7 @@ def get_init_data_for_agent(self, agent: "Agent") -> Dict[str, Any]:
Return the data for an agent already assigned to a particular unit
"""
init_state = agent.state.get_init_state()

if init_state is not None:
# reconnecting agent, give what we've got
return init_state
Expand All @@ -65,7 +71,9 @@ def get_init_data_for_agent(self, agent: "Agent") -> Dict[str, Any]:
assignment_data = self.get_data_for_assignment(assignment)
agent.state.set_init_state(assignment_data.shared)
new_state = agent.state.get_init_state()

assert new_state is not None, "Recently initialized state still None"

return new_state

def _agent_in_onboarding_or_live(self, agent: Union["Agent", "OnboardingAgent"]) -> bool:
Expand All @@ -81,27 +89,49 @@ def _run_server_timestep_for_agent(self, agent: Union["Agent", "OnboardingAgent"
queries
"""
live_update = agent.get_live_update()

if live_update is not None and "request_id" in live_update:
request_id = live_update["request_id"]
# Execute commands that come in from the frontend
# TODO extend scope to handle yield-style functions, and
# move these to async tasks

error_message = (
f"Target function {live_update['target']} not found in registry: "
f"{self.function_registry}"
)
assert (
self.function_registry is not None
and live_update["target"] in self.function_registry
), f"Target function {live_update['target']} not found in registry: {self.function_registry}"
), error_message

state = agent.state

assert isinstance(
state, RemoteProcedureAgentState
state,
RemoteProcedureAgentState,
), "Must use an agent with RemoteProcedureAgentState"
res = self.function_registry[live_update["target"]](
request_id, json.loads(live_update["args"]), state
)

try:
procedure_name = live_update["target"]
procedure = self.function_registry[procedure_name]
request_arguments = json.loads(live_update["args"])
procedure_response = procedure(request_id, request_arguments, state)
except Exception as e:
# If remote procedure raises any uncaught exception, we should not skip it,
# but return a comprehensive error response.
error_message = "Unexpected error during performing remote procedure."
logger.exception(error_message)
procedure_response = {
"errors": [error_message],
"original_error_message": str(e),
"status_code": http_status.HTTP_500_INTERNAL_SERVER_ERROR,
}

agent.observe(
{
"handles": request_id,
"response": json.dumps(res),
"response": json.dumps(procedure_response),
}
)

Expand Down
2 changes: 1 addition & 1 deletion mephisto/abstractions/providers/inhouse/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
LICENSE file in the root directory of this source tree.
-->

Inhouse provider is a local Mephisto provider in case if you want to run your tasks without any third-party provider.
Inhouse provider is a local Mephisto provider in case you want to run your tasks without any third-party provider.
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def setup_resources_for_task_run(
# we need to block Mephisto workers on Prolific as well
if blocked_participant_ids:
new_prolific_specific_qualifications = []
# Add empty Blacklist in case if there is not in state or config
# Add empty Blacklist in case there is not in state or config
blacklist_qualification = DictConfig(
dict(
name=CustomBlacklistEligibilityRequirement.name,
Expand Down
2 changes: 1 addition & 1 deletion mephisto/abstractions/providers/prolific/prolific_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ def pay_bonus(
client,
workspace_name=task_run_config.provider.prolific_workspace_name,
):
# Just in case if Prolific adds showing an available balance for an account
# Just in case Prolific adds showing an available balance for an account
logger.debug("Cannot pay bonus. Reason: Insufficient funds in your Prolific account.")
return False

Expand Down
17 changes: 15 additions & 2 deletions mephisto/client/cli_form_composer_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,18 @@ def form_composer_cli(
"when we use `--update_file_location_values` command"
),
)
@click.option(
"-c",
"--copy-config-files",
type=bool,
default=False,
is_flag=True,
help=(
"In case if you want to copy files into data directory of generator, "
"when `--directory` parameter was used. "
"It can be useful, when you have a bunch of different data directories"
),
)
def config(
ctx: click.Context,
verify: Optional[bool] = False,
Expand All @@ -209,6 +221,7 @@ def config(
permutate_separate_tokens: Optional[bool] = False,
directory: Optional[str] = None,
use_presigned_urls: Optional[bool] = False,
copy_files: Optional[bool] = False,
):
"""
Prepare (parts of) config for the `form_composer` command.
Expand Down Expand Up @@ -346,7 +359,7 @@ def config(

logger.info(f"[green]Finished configuring all steps[/green]")

# Move generated configs to default configs dir if user specified `--directory` option.
# Copy generated configs to default configs dir if user specified `--directory` option.
# This is needed to start a generator with these new configs
if directory:
if directory and copy_files:
shutil.copytree(app_data_path, default_app_data_path, dirs_exist_ok=True)
17 changes: 15 additions & 2 deletions mephisto/client/cli_video_annotator_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,18 @@ def video_annotator_cli(
"rendering when we use `--update_file_location_values` command"
),
)
@click.option(
"-c",
"--copy-config-files",
type=bool,
default=False,
is_flag=True,
help=(
"In case if you want to copy files into data directory of generator, "
"when `--directory` parameter was used. "
"It can be useful, when you have a bunch of different data directories"
),
)
def config(
ctx: click.Context,
verify: Optional[bool] = False,
Expand All @@ -191,6 +203,7 @@ def config(
permutate_separate_tokens: Optional[bool] = False,
directory: Optional[str] = None,
use_presigned_urls: Optional[bool] = False,
copy_files: Optional[bool] = False,
):
"""
Prepare (parts of) config for the `video_annotator` command.
Expand Down Expand Up @@ -330,7 +343,7 @@ def config(

logger.info(f"[green]Finished configuring all steps[/green]")

# Move generated configs to default configs dir if user specified `--directory` option.
# Copy generated configs to default configs dir if user specified `--directory` option.
# This is needed to start a generator with these new configs
if directory:
if directory and copy_files:
shutil.copytree(app_data_path, default_app_data_path, dirs_exist_ok=True)
2 changes: 1 addition & 1 deletion mephisto/generators/form_composer/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def _get_unit_data(data_browser: DataBrowser, unit: Unit) -> dict:

unit_inputs = unit_data.get("data", {}).get("inputs") or {}
unit_outputs = unit_data.get("data", {}).get("outputs") or {}
# In case if there is outdated code that returns `final_submission`
# in case there is outdated code that returns `final_submission`
# under `inputs` and `outputs` keys, we should use the value in side `final_submission`
if "final_submission" in unit_inputs:
unit_inputs = unit_inputs["final_submission"]
Expand Down
4 changes: 2 additions & 2 deletions mephisto/review_app/client/src/pages/TaskPage/TaskPage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ function TaskPage(props: TaskPagePropsType) {
function setNextUnit() {
let firstWrokerUnits = workerUnits[0];

// If no Workers left (in case if we review multiple Units)
// If no Workers left (in case we review multiple Units)
if (!firstWrokerUnits) {
setFinishedTask(true);
return;
Expand All @@ -255,7 +255,7 @@ function TaskPage(props: TaskPagePropsType) {
if (firstWrokerUnits[1].length === 0) {
workerUnits.shift();

// If no Worker Units left (in case if we review Units one by one)
// If no Worker Units left (in case we review Units one by one)
if (workerUnits.length === 0) {
setFinishedTask(true);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def get(self, task_id: str = None) -> dict:
try:
unit_data = app.data_browser.get_data_from_unit(unit)
except AssertionError:
# In case if unit does not have agent somehow
# in case unit does not have an agent somehow
unit_data = {}

task_units_data[unit_id] = unit_data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def get(self, task_id: str = None) -> dict:
try:
unit_data = app.data_browser.get_data_from_unit(unit)
except AssertionError:
# In case if this is Expired Unit. It raises and axceptions
# in case this is Expired Unit. It raises and axceptions
unit_data = {}

metadata = unit_data.get("metadata", {})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class UnitDataStaticByFieldNameView(MethodView):
def _get_filename_by_fieldname(agent: Agent, fieldname: str) -> Union[str, None]:
unit_parsed_data = agent.state.get_parsed_data()
outputs = unit_parsed_data.get("outputs") or {}
# In case if there is outdated code that returns `final_submission`
# in case there is outdated code that returns `final_submission`
# under `outputs` key, we should use the value in side `final_submission`
if "final_submission" in outputs:
outputs = outputs["final_submission"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def _get_filename_by_original_name(unit: Unit, filename: str) -> Union[str, None
if agent:
unit_parsed_data = agent.state.get_parsed_data()
outputs = unit_parsed_data.get("outputs") or {}
# In case if there is outdated code that returns `final_submission`
# in case there is outdated code that returns `final_submission`
# under `outputs` key, we should use the value in side `final_submission`
if "final_submission" in outputs:
outputs = outputs["final_submission"]
Expand Down
4 changes: 2 additions & 2 deletions mephisto/review_app/server/api/views/units_details_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def get(self) -> dict:
try:
unit_data = app.data_browser.get_data_from_unit(unit)
except AssertionError:
# In case if this is Expired Unit. It raises and axceptions
# in case this is Expired Unit. It raises and axceptions
unit_data = {}

task_run: TaskRun = unit.get_task_run()
Expand All @@ -134,7 +134,7 @@ def get(self) -> dict:
is_form_composer_task = FORM_COMPOSER_TASK_TAG in task_tags
is_video_annotator_task = VIDEO_ANNOTATOR_TASK_TAG in task_tags

# In case if there is outdated code that returns `final_submission`
# in case there is outdated code that returns `final_submission`
# under `inputs` and `outputs` keys, we should use the value in side `final_submission`
if "final_submission" in inputs:
inputs = inputs["final_submission"]
Expand Down
2 changes: 1 addition & 1 deletion mephisto/review_app/server/api/views/units_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def get(self) -> dict:
try:
unit_data = app.data_browser.get_data_from_unit(unit)
except AssertionError:
# In case if this is Expired Unit. It raises and axceptions
# in case this is Expired Unit. It raises and axceptions
unit_data = {}

bonus = unit_data.get("bonus")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from .base_merge_conflict_resolver import BaseMergeConflictResolver

# Import all conflict resolver classes except the base class.
# This is needed in case if user decides to write a custom class and
# This is needed in case user decides to write a custom class and
# this way its name will be available as a parameter for import command
current_dir = os.path.dirname(os.path.abspath(__file__))
for (_, module_name, _) in iter_modules([current_dir]):
Expand Down
2 changes: 1 addition & 1 deletion mephisto/tools/db_data_porter/import_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def import_single_db(
in_progress_dump_row = dump_row

# --- HACK (#UNIT.AGENT_ID) START #2:
# We save pairs `unit_id: agent_id` in case if `agent_id is not None` and
# We save pairs `unit_id: agent_id` in case `agent_id is not None` and
# replace `agent_id` with `None`
if provider_type == MEPHISTO_DUMP_KEY:
if table_name == "units" and (unit_agent_id := dump_row.get("agent_id")):
Expand Down
4 changes: 2 additions & 2 deletions mephisto/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ def wrapped_fn(*args, **kwargs):
result = unreliable_fn(*args, **kwargs)
return result
except caught_excs_tuple as e:
# We can check constraint only in case if excpetion was configured well.
# We can check constraint only in case excpetion was configured well.
# Othervise, we just leave error as is
exc_message = str(getattr(e, "original_exc", None) or "")
db = getattr(e, "db", None)
Expand All @@ -670,7 +670,7 @@ def wrapped_fn(*args, **kwargs):
if db and table_name and is_pk_unique_constraint:
pk_exists = True
else:
# In case if we caught other unique constraint, reraise it
# in case we caught other unique constraint, reraise it
raise

# Set original function name to wrapped one.
Expand Down
2 changes: 1 addition & 1 deletion packages/mephisto-addons/build/bundle.js

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions packages/mephisto-addons/src/FormComposer/Field.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ function Field({
`}
title={tooltip}
data-name={data.name}
data-invalid={isInvalid}
>
{data.icon && <i>{data.icon}</i>}

Expand Down
Loading

0 comments on commit 53eaae0

Please sign in to comment.