Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added WriteConcern as a param for dataset #3

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions fiftyone/core/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import warnings

from bson import ObjectId
from pymongo import InsertOne, UpdateOne, UpdateMany
from pymongo import InsertOne, UpdateOne, UpdateMany, WriteConcern

import eta.core.serial as etas
import eta.core.utils as etau
Expand Down Expand Up @@ -9126,7 +9126,9 @@ def get_index_information(self, include_stats=False):

return index_info

def create_index(self, field_or_spec, unique=False, **kwargs):
def create_index(
self, field_or_spec, unique=False, acknowledged=True, **kwargs
):
"""Creates an index on the given field or with the given specification,
if necessary.

Expand Down Expand Up @@ -9160,6 +9162,8 @@ def create_index(self, field_or_spec, unique=False, **kwargs):
:meth:`pymongo:pymongo.collection.Collection.create_index` for
supported values
unique (False): whether to add a uniqueness constraint to the index
acknowledged (True): whether to wait and acknowledge index
creation
**kwargs: optional keyword arguments for
:meth:`pymongo:pymongo.collection.Collection.create_index`

Expand Down Expand Up @@ -9238,10 +9242,17 @@ def create_index(self, field_or_spec, unique=False, **kwargs):
# Satisfactory index already exists
return index_name

# Setting `w=0` sets `acknowledged=False` in pymongo
write_concern = WriteConcern(w=0) if not acknowledged else None

if is_frame_index:
coll = self._dataset._frame_collection
coll = self._dataset._get_frame_collection(
write_concern=write_concern
)
else:
coll = self._dataset._sample_collection
coll = self._dataset._get_sample_collection(
write_concern=write_concern
)

name = coll.create_index(index_spec, unique=unique, **kwargs)

Expand Down
37 changes: 32 additions & 5 deletions fiftyone/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,15 @@
import cachetools
from deprecated import deprecated
import mongoengine.errors as moe
from pymongo import DeleteMany, InsertOne, ReplaceOne, UpdateMany, UpdateOne
from pymongo import (
DeleteMany,
InsertOne,
ReplaceOne,
UpdateMany,
UpdateOne,
WriteConcern,
)
from pymongo.collection import Collection
from pymongo.errors import CursorNotFound, BulkWriteError

import eta.core.serial as etas
Expand Down Expand Up @@ -322,6 +330,7 @@ def __init__(
self._run_cache = cachetools.LRUCache(5)

self._deleted = False
self._write_concern = None

if not _virtual:
self._update_last_loaded_at()
Expand Down Expand Up @@ -1172,14 +1181,22 @@ def stats(

def _sample_collstats(self):
conn = foo.get_db_conn()
return conn.command("collstats", self._sample_collection_name)
return conn.command(
"collstats",
self._sample_collection_name,
write_concern=self._write_concern,
)

def _frame_collstats(self):
if self._frame_collection_name is None:
return None

conn = foo.get_db_conn()
return conn.command("collstats", self._frame_collection_name)
return conn.command(
"collstats",
self._frame_collection_name,
write_concern=self._write_concern,
)

def first(self):
"""Returns the first sample in the dataset.
Expand Down Expand Up @@ -7023,18 +7040,28 @@ def _sample_collection_name(self):

@property
def _sample_collection(self):
return foo.get_db_conn()[self._sample_collection_name]
return self._get_sample_collection(write_concern=self._write_concern)

def _get_sample_collection(self, write_concern=None) -> Collection:
return foo.get_db_conn().get_collection(
self._sample_collection_name, write_concern=write_concern
)

@property
def _frame_collection_name(self):
return self._doc.frame_collection_name

@property
def _frame_collection(self):
return self._get_frame_collection(write_concern=self._write_concern)

def _get_frame_collection(self, write_concern=None) -> Collection:
if self._frame_collection_name is None:
return None

return foo.get_db_conn()[self._frame_collection_name]
return foo.get_db_conn().get_collection(
self._frame_collection_name, write_concern=write_concern
)

@property
def _frame_indexes(self):
Expand Down