Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convokit 3.0 Mega Pull Request #197

Merged
merged 15 commits into from
Jul 11, 2023
Merged
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
[![Slack Community](https://img.shields.io/static/v1?logo=slack&style=flat&color=red&label=slack&message=community)](https://join.slack.com/t/convokit/shared_invite/zt-1axq34qrp-1hDXQrvSXClIbJOqw4S03Q)


This toolkit contains tools to extract conversational features and analyze social phenomena in conversations, using a [single unified interface](https://convokit.cornell.edu/documentation/architecture.html) inspired by (and compatible with) scikit-learn. Several large [conversational datasets](https://github.com/CornellNLP/ConvoKit#datasets) are included together with scripts exemplifying the use of the toolkit on these datasets. The latest version is [2.5.3](https://github.com/CornellNLP/ConvoKit/releases/tag/v2.5.2) (released 16 Jan 2022); follow the [project on GitHub](https://github.com/CornellNLP/ConvoKit) to keep track of updates.
This toolkit contains tools to extract conversational features and analyze social phenomena in conversations, using a [single unified interface](https://convokit.cornell.edu/documentation/architecture.html) inspired by (and compatible with) scikit-learn. Several large [conversational datasets](https://github.com/CornellNLP/ConvoKit#datasets) are included together with scripts exemplifying the use of the toolkit on these datasets. The latest version is [3.0.0](https://github.com/CornellNLP/ConvoKit/releases/tag/v3.0.0) (released 1 June 2023); follow the [project on GitHub](https://github.com/CornellNLP/ConvoKit) to keep track of updates.

Read our [documentation](https://convokit.cornell.edu/documentation) or try ConvoKit in our [interactive tutorial](https://colab.research.google.com/github/CornellNLP/ConvoKit/blob/master/examples/Introduction_to_ConvoKit.ipynb).

Expand Down
16 changes: 14 additions & 2 deletions convokit/coordination/coordination.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from collections import defaultdict
from typing import Callable, Tuple, List, Dict, Optional, Collection, Union
import copy

import pkg_resources

Expand Down Expand Up @@ -108,11 +109,22 @@ def transform(self, corpus: Corpus) -> Corpus:
utterance_thresh_func=self.utterance_thresh_func,
)

# Keep record of all score update for all (speakers, target) pairs to avoid redundant operations
todo = {}

for (speaker, target), score in pair_scores.items():
if self.coordination_attribute_name not in speaker.meta:
speaker.meta[self.coordination_attribute_name] = {}
speaker.meta[self.coordination_attribute_name][target.id] = score

key = (speaker, target.id)
todo.update({key: score})

for key, score in todo.items():
speaker = key[0]
target = key[1]
# For avoiding mutability for the sake of DB corpus
temp_dict = copy.deepcopy(speaker.meta[self.coordination_attribute_name])
temp_dict[target] = score
speaker.meta[self.coordination_attribute_name] = temp_dict
assert isinstance(speaker, Speaker)

return corpus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
import pickle


class StorageManager(metaclass=ABCMeta):
class BackendMapper(metaclass=ABCMeta):
"""
Abstraction layer for the concrete representation of data and metadata
within corpus components (e.g., Utterance text and timestamps). All requests
to access or modify corpusComponent fields (with the exception of ID) are
actually routed through one of StorageManager's concrete subclasses. Each
subclass implements a storage backend that contains the actual data.
actually routed through one of BackendMapper's concrete subclasses. Each
subclass implements a concrete backend mapping from ConvoKit operations to actual data.
(These mappings are referred to as collections.)
"""

def __init__(self):
Expand Down Expand Up @@ -84,7 +85,7 @@ def delete_data(
self, component_type: str, component_id: str, property_name: Optional[str] = None
):
"""
Delete a data entry from this StorageManager for the component of type
Delete a data entry from this BackendMapper for the component of type
component_type with id component_id. If property_name is specified
delete only that property, otherwise delete the entire entry.
"""
Expand All @@ -93,7 +94,7 @@ def delete_data(
@abstractmethod
def clear_all_data(self):
"""
Erase all data from this StorageManager (i.e., reset self.data to its
Erase all data from this BackendMapper (i.e., reset self.data to its
initial empty state; Python will garbage-collect the now-unreferenced
old data entries). This is used for cleanup after destructive Corpus
operations.
Expand All @@ -104,7 +105,7 @@ def clear_all_data(self):
def count_entries(self, component_type: str):
"""
Count the number of entries held for the specified component type by
this StorageManager instance
this BackendMapper instance
"""
return NotImplemented

Expand All @@ -117,7 +118,7 @@ def get_collection(self, component_type: str):

def purge_obsolete_entries(self, utterance_ids, conversation_ids, speaker_ids, meta_ids):
"""
Compare the entries in this StorageManager to the existing component ids
Compare the entries in this BackendMapper to the existing component ids
provided as parameters, and delete any entries that are not found in the
parameter ids.
"""
Expand All @@ -133,9 +134,9 @@ def purge_obsolete_entries(self, utterance_ids, conversation_ids, speaker_ids, m
self.delete_data(obj_type, obj_id)


class MemStorageManager(StorageManager):
class MemMapper(BackendMapper):
"""
Concrete StorageManager implementation for in-memory data storage.
Concrete BackendMapper implementation for in-memory data storage.
Collections are implemented as vanilla Python dicts.
"""

Expand Down Expand Up @@ -170,7 +171,7 @@ def get_data(
collection = self.get_collection(component_type)
if component_id not in collection:
raise KeyError(
f"This StorageManager does not have an entry for the {component_type} with id {component_id}."
f"This BackendMapper does not have an entry for the {component_type} with id {component_id}."
)
if property_name is None:
return collection[component_id]
Expand All @@ -190,7 +191,7 @@ def update_data(
# CorpusComponent constructor so if the ID is missing that indicates something is wrong
if component_id not in collection:
raise KeyError(
f"This StorageManager does not have an entry for the {component_type} with id {component_id}."
f"This BackendMapper does not have an entry for the {component_type} with id {component_id}."
)
collection[component_id][property_name] = new_value

Expand All @@ -200,7 +201,7 @@ def delete_data(
collection = self.get_collection(component_type)
if component_id not in collection:
raise KeyError(
f"This StorageManager does not have an entry for the {component_type} with id {component_id}."
f"This BackendMapper does not have an entry for the {component_type} with id {component_id}."
)
if property_name is None:
del collection[component_id]
Expand All @@ -215,9 +216,9 @@ def count_entries(self, component_type: str):
return len(self.get_collection(component_type))


class DBStorageManager(StorageManager):
class DBMapper(BackendMapper):
"""
Concrete StorageManager implementation for database-backed data storage.
Concrete BackendMapper implementation for database-backed data storage.
Collections are implemented as MongoDB collections.
"""

Expand Down Expand Up @@ -272,7 +273,7 @@ def get_data(
all_fields = collection.find_one({"_id": component_id})
if all_fields is None:
raise KeyError(
f"This StorageManager does not have an entry for the {component_type} with id {component_id}."
f"This BackendMapper does not have an entry for the {component_type} with id {component_id}."
)
if property_name is None:
# if some data is known to be binary type, unpack it
Expand Down
12 changes: 11 additions & 1 deletion convokit/model/convoKitMeta.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .convoKitIndex import ConvoKitIndex
import json
from typing import Union
import copy

# See reference: https://stackoverflow.com/questions/7760916/correct-usage-of-a-getter-setter-for-dictionary-values

Expand All @@ -30,9 +31,18 @@ def storage_key(self) -> str:
return f"{self.obj_type}_{self.owner.id}"

def __getitem__(self, item):
return self._get_storage().get_data(
# in DB mode, metadata field mutation would not be updated. (ex. mutating dict/list metadata fields)
# we align MEM mode behavior and DB mode by making deepcopy of metadata fields, so mutation no longer
# affect corpus metadata storage, but only acting on the copy of it.
item = self._get_storage().get_data(
"meta", self.storage_key, item, self.index.get_index(self.obj_type)
)
immutable_types = (int, float, bool, complex, str, tuple, frozenset)
if isinstance(item, immutable_types):
return item
else:
# return copy.deepcopy(item) if item is not common python immutable type
return copy.deepcopy(item)

def _get_storage(self):
# special case for Corpus meta since that's the only time owner is not a CorpusComponent
Expand Down
55 changes: 30 additions & 25 deletions convokit/model/corpus.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from .convoKitMatrix import ConvoKitMatrix
from .corpusUtil import *
from .corpus_helpers import *
from .storageManager import StorageManager
from .backendMapper import BackendMapper


class Corpus:
Expand All @@ -19,6 +19,8 @@ class Corpus:

:param filename: Path to a folder containing a Corpus or to an utterances.jsonl / utterances.json file to load
:param utterances: list of utterances to initialize Corpus from
:param db_collection_prefix: if a db backend is used, this determines how the database will be named. If not specified, a random name will be used.
:param db_host: if specified, and a db backend is used, connect to the database at this URL. If not specified, will default to the db_host in the ConvoKit global configuration file.
:param preload_vectors: list of names of vectors to be preloaded from directory; by default,
no vectors are loaded but can be loaded any time after corpus initialization (i.e. vectors are lazy-loaded).
:param utterance_start_index: if loading from directory and the corpus folder contains utterances.jsonl, specify the
Expand All @@ -36,6 +38,9 @@ class Corpus:
index.json is already accurate and disabling it will allow for a faster corpus load. This parameter is set to
True by default, i.e. type-checking is not carried out.

:param backend: specify the backend type, either “mem” or “db”, default to “mem”.
:param backend_mapper: (advanced usage only) if provided, use this as the BackendMapper instance instead of initializing a new one.

:ivar meta_index: index of Corpus metadata
:ivar vectors: the vectors stored in the Corpus
:ivar corpus_dirpath: path to the directory the corpus was loaded from
Expand All @@ -56,24 +61,24 @@ def __init__(
exclude_speaker_meta: Optional[List[str]] = None,
exclude_overall_meta: Optional[List[str]] = None,
disable_type_check=True,
storage_type: Optional[str] = None,
storage: Optional[StorageManager] = None,
backend: Optional[str] = None,
backend_mapper: Optional[BackendMapper] = None,
):
self.config = ConvoKitConfig()
self.corpus_dirpath = get_corpus_dirpath(filename)

# configure corpus ID (optional for mem mode, required for DB mode)
if storage_type is None:
storage_type = self.config.default_storage_mode
if db_collection_prefix is None and filename is None and storage_type == "db":
if backend is None:
backend = self.config.default_storage_mode
if db_collection_prefix is None and filename is None and backend == "db":
db_collection_prefix = create_safe_id()
warn(
"You are in DB mode, but no collection prefix was specified and no filename was given from which to infer one."
"Will use a randomly generated unique prefix " + db_collection_prefix
)
self.id = get_corpus_id(db_collection_prefix, filename, storage_type)
self.storage_type = storage_type
self.storage = initialize_storage(self, storage, storage_type, db_host)
self.id = get_corpus_id(db_collection_prefix, filename, backend)
self.backend = backend
self.backend_mapper = initialize_storage(self, backend_mapper, backend, db_host)

self.meta_index = ConvoKitIndex(self)
self.meta = ConvoKitMeta(self, self.meta_index, "corpus")
Expand All @@ -91,10 +96,10 @@ def __init__(
if exclude_overall_meta is None:
exclude_overall_meta = []

if filename is not None and storage_type == "db":
if filename is not None and backend == "db":
# JSON-to-DB construction mode uses a specialized code branch, which
# optimizes for this use case by using direct batch insertions into the
# DB rather than going through the StorageManager, hence improving
# DB rather than going through the BackendMapper, hence improving
# efficiency.

with open(os.path.join(filename, "index.json"), "r") as f:
Expand All @@ -104,7 +109,7 @@ def __init__(
# populate the DB with the contents of the source file
ids_in_db = populate_db_from_file(
filename,
self.storage.db,
self.backend_mapper.db,
self.id,
self.meta_index,
utterance_start_index,
Expand All @@ -115,7 +120,7 @@ def __init__(
exclude_overall_meta,
)

# with the StorageManager's DB now populated, initialize the corresponding
# with the BackendMapper's DB now populated, initialize the corresponding
# CorpusComponent instances.
init_corpus_from_storage_manager(self, ids_in_db)

Expand Down Expand Up @@ -216,8 +221,8 @@ def reconnect_to_db(cls, db_collection_prefix: str, db_host: Optional[str] = Non
resume where you left off.
"""
# create a blank Corpus that will hold the data
result = cls(db_collection_prefix=db_collection_prefix, db_host=db_host, storage_type="db")
# through the constructor, the blank Corpus' StorageManager is now connected
result = cls(db_collection_prefix=db_collection_prefix, db_host=db_host, backend="db")
# through the constructor, the blank Corpus' BackendMapper is now connected
# to the DB. Next use the DB contents to populate the corpus components.
init_corpus_from_storage_manager(result)

Expand Down Expand Up @@ -621,7 +626,7 @@ def filter_conversations_by(self, selector: Callable[[Conversation], bool]):
meta_ids.append(convo.meta.storage_key)
for speaker in self.iter_speakers():
meta_ids.append(speaker.meta.storage_key)
self.storage.purge_obsolete_entries(
self.backend_mapper.purge_obsolete_entries(
self.get_utterance_ids(), self.get_conversation_ids(), self.get_speaker_ids(), meta_ids
)

Expand All @@ -645,8 +650,8 @@ def filter_utterances(source_corpus: "Corpus", selector: Callable[[Utterance], b
convo.meta.update(source_corpus.get_conversation(convo.id).meta)

# original Corpus is invalidated and no longer usable; clear all data from
# its now-orphaned StorageManager to avoid having duplicates in memory
source_corpus.storage.clear_all_data()
# its now-orphaned BackendMapper to avoid having duplicates in memory
source_corpus.backend_mapper.clear_all_data()

return new_corpus

Expand Down Expand Up @@ -720,8 +725,8 @@ def reindex_conversations(
print(missing_convo_roots)

# original Corpus is invalidated and no longer usable; clear all data from
# its now-orphaned StorageManager to avoid having duplicates in memory
source_corpus.storage.clear_all_data()
# its now-orphaned BackendMapper to avoid having duplicates in memory
source_corpus.backend_mapper.clear_all_data()

return new_corpus

Expand Down Expand Up @@ -1027,10 +1032,10 @@ def merge(primary: "Corpus", secondary: "Corpus", warnings: bool = True):
new_corpus.reinitialize_index()

# source corpora are now invalidated and all needed data has been copied
# into the new merged corpus; clear the source corpora's storage to
# into the new merged corpus; clear the source corpora's backend mapper to
# prevent having duplicates in memory
primary.storage.clear_all_data()
secondary.storage.clear_all_data()
primary.backend_mapper.clear_all_data()
secondary.backend_mapper.clear_all_data()

return new_corpus

Expand Down Expand Up @@ -1295,9 +1300,9 @@ def load_info(self, obj_type, fields=None, dir_name=None):
for field in fields:
# self.aux_info[field] = self.load_jsonlist_to_dict(
# os.path.join(dir_name, 'feat.%s.jsonl' % field))
if self.storage_type == "mem":
if self.backend == "mem":
load_info_to_mem(self, dir_name, obj_type, field)
elif self.storage_type == "db":
elif self.backend == "db":
load_info_to_db(self, dir_name, obj_type, field)

def dump_info(self, obj_type, fields, dir_name=None):
Expand Down
4 changes: 2 additions & 2 deletions convokit/model/corpusComponent.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def set_owner(self, owner):
self._owner = owner
if owner is not None:
# when a new owner Corpus is assigned, we must take the following steps:
# (1) transfer this component's data to the new owner's StorageManager
# (1) transfer this component's data to the new owner's BackendMapper
# (2) avoid duplicates by removing the data from the old owner (or temp storage if there was no prior owner)
# (3) reinitialize the metadata instance
data_dict = (
Expand All @@ -71,7 +71,7 @@ def set_owner(self, owner):
def init_meta(self, meta, overwrite=False):
if self._owner is None:
# ConvoKitMeta instances are not allowed for ownerless (standalone)
# components since they must be backed by a StorageManager. In this
# components since they must be backed by a BackendMapper. In this
# case we must forcibly convert the ConvoKitMeta instance to dict
if isinstance(meta, ConvoKitMeta):
meta = meta.to_dict()
Expand Down
Loading