Skip to content
This repository has been archived by the owner on Nov 3, 2023. It is now read-only.

Commit

Permalink
Crowdsourcing data compiler class for using Mephisto abstractions. (#…
Browse files Browse the repository at this point in the history
…4029)

* result compiler based on mephisto browser.

* debug after running the internal data compile.

* readme for the new class

* minor

* passing circle CI tests

* pr comments

* pr comments round 2
  • Loading branch information
mojtaba-komeili authored Sep 21, 2021
1 parent c14b3e7 commit b2db317
Show file tree
Hide file tree
Showing 3 changed files with 290 additions and 83 deletions.
55 changes: 54 additions & 1 deletion parlai/crowdsourcing/utils/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Overview
- `acceptability.py`: Used to ensure that a worker's messages throughout a conversation meet certain criteria (not too short, not all caps, not a lot of repetition, safety, etc.). More details about the acceptability checker can be found in the `acceptability.AcceptabilityChecker` section below.
- `analysis.py`: Abstract base classes for compiling the results of crowdsourcing runs.
- `analysis.py`: Abstract base classes for compiling the results of crowdsourcing runs. See `analysis.py` section below.
- `frontend.py`: Method for compiling the frontend code of crowdsourcing tasks.
- `mturk.py`: Code for soft-blocking MTurk crowdsourcing workers (preventing them from working on this specific task), as well as a Hydra flag to pass in lists of workers to soft-block.
- `tests.py`: Abstract base classes for testing different categories of crowdsourcing tasks.
Expand All @@ -14,3 +14,56 @@
- Add the code for this in `.check_messages()`, inside a `if 'check_name' in violation_types:` condition
- Add the name of the check to `self.ALL_VIOLATION_TYPES`; otherwise, this check will not be recognized, and an error will be raised if the check is specified when calling `.check_messages()`!
- To use the check: add the name of the check to the `violation_types` arg when calling `.check_messages()`

## `analysis.py`

Contains abstract classes that provide the basic functionalities for compiling data from a Mephsito task.
Mephisto provides two interfaces for retrieving its crowdsourced data; `MephistoDB` and `DataBrowser`.
Using `AbstractResultsCompiler` you do not need to directly interact with these two modules---it provides an abstraction on top of these two.
This class has methods such as `get_task_data` and `get_task_units` which handles interacting with Mephisto abstractions.
For compiling your dataset from your crowdsourced Mephisto task, you need to extend this class and implement the following methods:

* `compile_results` that returns a python *dictionary* (key-value pairs) or a pandas *dataframe*. We assume that, each unit of the crowdsourcing task (for example, annotation or conversation) has a unique id.
In the json format, this id is the key for the entry that keeps dialogue data for that conversation.
If the format is a dataframe, the convention is to have each row of the dataframe keep the data for a single utterance (interaction). Hence, the conversation id needs to be stored in a column for distinguishing the data from different dialogues.

* (optional) `is_unit_acceptable` helps with simple filtering and data clean up. It receives the data from a unit of work and returns a boolean. We discard this unit if it returns `False`.

### Example
Imagine you have a Mephisto task that the output of each unit of its work looks like this:
```.python
{
'ID': 1234,
'favorite_flower': 'rose',
'favorite_season', 'winter',
'status': 'accepted'
}
```

Let's say this task is called `flowers_crowdsourcing` and want to discard every participant with `status` being "accepted".
Here is how you can have your data compiled and saved:

```.python
from parlai.crowdsourcing.utils.analysis import AbstractResultsCompiler
class MyResultsCompiler(AbstractResultsCompiler):
def is_unit_acceptable(self, unit_data):
return unit_data['status'] == 'accepted'
def compile_results(self):
data = dict()
for work_unit in self.get_task_data():
unit_id = work_unit.pop('ID')
data[unit_id] = work_unit
return data
if __name__ == '__main__':
opt = {
'task_name': 'flowers_crowdsourcing',
'results_format': 'json',
'output_folder': '/path/dataset/',
}
wizard_data_compiler = MyResultsCompiler(opt)
wizard_data_compiler.compile_and_save_results()
```
185 changes: 103 additions & 82 deletions parlai/crowdsourcing/utils/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
import os
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, Dict, List
import json
from typing import Any, Dict, List, Union

import pandas as pd

Expand Down Expand Up @@ -46,11 +47,30 @@ def setup_args(cls):
default='csv',
help='Output format for results data',
)
parser.add_argument(
'--task-name', type=str, help='Name of the Mephisto task to open'
)
return parser

def __init__(self, opt: Opt):
self.output_folder = opt.get('output_folder')
self.results_format = opt['results_format']
self.task_name = opt['task_name']
self.output_folder = opt['output_folder']
self.results_format = opt.get('results_format', 'json')

# We lazily load these later, or inject their mock version during testing.
self._mephisto_db = None
self._mephisto_data_browser = None

def get_mephisto_data_browser(self) -> MephistoDataBrowser:
if not self._mephisto_data_browser:
db = self.get_mephisto_db()
self._mephisto_data_browser = MephistoDataBrowser(db=db)
return self._mephisto_data_browser

def get_mephisto_db(self) -> LocalMephistoDB:
if not self._mephisto_db:
self._mephisto_db = LocalMephistoDB()
return self._mephisto_db

def get_results_path_base(self) -> str:
"""
Expand All @@ -62,6 +82,48 @@ def get_results_path_base(self) -> str:
f'{self.__class__.__name__}__{now.strftime("%Y%m%d_%H%M%S")}',
)

def get_worker_name(self, worker_id: str) -> str:
"""
Gets the global id of a worker from their Mephisto worker_id.
The worker_id is the unique id that the crowdsourcing platforms (eg, Amazon
Mechanical Turk) assign to a single human worker in their system.
"""
db = self.get_mephisto_db()
return db.get_worker(worker_id)["worker_name"]

def get_task_units(self) -> List[Unit]:
"""
Retrieves the list of work units from the Mephisto task.
"""
data_browser = self.get_mephisto_data_browser()
return data_browser.get_units_for_task_name(self.task_name)

def get_data_from_unit(self, unit: Unit) -> Dict[str, Any]:
"""
Retrieves task data for a single unit.
"""
try:
data_browser = self.get_mephisto_data_browser()
return data_browser.get_data_from_unit(unit)
except (IndexError, AssertionError) as error:
logging.error(error)
logging.warning(
f'Skipping unit {unit.db_id}. No message found for this unit.'
)

def get_task_data(self) -> List[Dict[str, Any]]:
"""
Retrieves task data for a list of Mephisto task units.
"""
task_data = []
for unit in self.get_task_units():
unit_data = self.get_data_from_unit(unit)
if unit_data and self.is_unit_acceptable(unit_data):
task_data.append(unit_data)

return task_data

def is_unit_acceptable(self, unit_data: Dict[str, Any]) -> bool:
"""
Helps filtering units that are compiled. Override for use.
Expand All @@ -76,33 +138,63 @@ def is_unit_acceptable(self, unit_data: Dict[str, Any]) -> bool:
return True

@abstractmethod
def compile_results(self) -> pd.DataFrame:
def compile_results(self) -> Union[pd.DataFrame, Dict[str, Any]]:
"""
Method for returning the final results dataframe.
Method for returning the final results as a dataframe or a json.
For Dict output each key is a unique identifier (eg Assignment ID) for a unit of
crowdsourcing work. The data for that unit is stored in the value as dictionary.
Each row of the dataframe consists of one utterance of one conversation.
Each row of the dataframe consists of one utterance of one conversation, or crowdsourcing interaction.
NOTE: Preference for new projects is Dict output (see the TODO below).
TODO: Only support Dict. Deprecate ` pd.DataFrame` when no other code is relying on it.
"""

def _validate_compiled_result_type(self, results):
assert isinstance(results, dict) or isinstance(results, pd.DataFrame), (
'The output of result compiler needs to be a dictionary or a pandas dataframe. '
f'Found ({type(results)})'
)

def compile_and_save_results(self):
"""
Compile results and save them.
Results will be saved in the format given by --results-format.
"""
result_df = self.compile_results()
compiled_results = self.compile_results()
self._validate_compiled_result_type(compiled_results)
results_path_base = self.get_results_path_base()
results_path = f'{results_path_base}.{self.results_format}'
os.makedirs(self.output_folder, exist_ok=True)
if self.results_format == 'csv':
result_df.to_csv(results_path, index=False)
if not isinstance(compiled_results, pd.DataFrame):
logging.warning(
"The requested data output format was 'csv' while the data was compiled as a 'dict'. "
'Transforming dictionary data into pd.DataFrame using pandas.'
)
compiled_results = pd.DataFrame.from_dict(
compiled_results, orient='index'
)
compiled_results.to_csv(results_path, index=False)
elif self.results_format == 'json':
result_df.reset_index().to_json(results_path)
# Reset the index to make each row have a unique index value
if isinstance(compiled_results, pd.DataFrame):
logging.warning(
"The requested data output format was 'json' while the data was compiled as a 'dataframe'. "
'Transforming dataframe into json using pandas.'
)
# Reset the index to make each row have a unique index value
compiled_results.reset_index().to_json(results_path)
else:
with open(results_path, 'w') as fout:
fout.write(json.dumps(compiled_results))

else:
raise ValueError(
f'Results save format of "{self.results_format}" currently unsupported!'
)
print(f'Wrote results file to {results_path}.')
logging.info(f'Wrote results file to {results_path}.')


class AbstractTurnAnnotationResultsCompiler(AbstractResultsCompiler):
Expand Down Expand Up @@ -143,74 +235,3 @@ def __init__(self, opt: Opt):
else:
self.use_problem_buckets = False
self.problem_buckets = []


class AbstractDataBrowserResultsCompiler(AbstractResultsCompiler):
"""
Provides interface for using Mephisto's DataBrowser, DB, and their methods.
Uses Mephisto's DataBrowser to retrieve the work units and their data.
"""

@classmethod
def setup_args(cls):
parser = super().setup_args()
parser.add_argument(
'--task-name', type=str, help='Name of the Mephisto task to open'
)
return parser

def __init__(self, opt: Opt):
super().__init__(opt)
self.task_name = opt['task_name']
self._mephisto_db = None
self._mephisto_data_browser = None

def get_mephisto_data_browser(self) -> MephistoDataBrowser:
if not self._mephisto_data_browser:
db = self.get_mephisto_db()
self._mephisto_data_browser = MephistoDataBrowser(db=db)
return self._mephisto_data_browser

def get_mephisto_db(self) -> LocalMephistoDB:
if not self._mephisto_db:
self._mephisto_db = LocalMephistoDB()
return self._mephisto_db

def get_worker_name(self, worker_id: str) -> str:
"""
Gets the global (AWS) id of a worker from their Mephisto worker_id.
"""
db = self.get_mephisto_db()
return db.get_worker(worker_id)["worker_name"]

def get_task_units(self, task_name: str) -> List[Unit]:
"""
Retrieves the list of work units from the Mephisto task.
"""
data_browser = self.get_mephisto_data_browser()
return data_browser.get_units_for_task_name(task_name)

def get_data_from_unit(self, unit: Unit) -> Dict[str, Any]:
"""
Retrieves task data for a single unit.
"""
try:
data_browser = self.get_mephisto_data_browser()
return data_browser.get_data_from_unit(unit)
except (IndexError, AssertionError):
logging.warning(
f'Skipping unit {unit.db_id}. No message found for this unit.'
)

def get_units_data(self, task_units: List[Unit]) -> List[Dict[str, Any]]:
"""
Retrieves task data for a list of Mephisto task units.
"""
task_data = []
for unit in task_units:
unit_data = self.get_data_from_unit(unit)
if unit_data and self.is_unit_acceptable(unit_data):
task_data.append(unit_data)

return task_data
Loading

0 comments on commit b2db317

Please sign in to comment.