Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update oplog_manager.py #938

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion mongo_connector/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from mongo_connector.util import log_fatal_exceptions, retry_until_ok
from mongo_connector.namespace_config import NamespaceConfig, validate_namespace_options
from mongo_connector.version import Version
from bson.codec_options import DatetimeConversion


# Monkey patch logging to add Logger.always
Expand Down Expand Up @@ -332,7 +333,8 @@ def create_authed_client(self, hosts=None, **kwargs):
new_uri = self.address
else:
new_uri = self.copy_uri_options(hosts, self.address)
client = MongoClient(new_uri, tz_aware=self.tz_aware, **kwargs)
client = MongoClient(new_uri, tz_aware=self.tz_aware, datetime_conversion=DatetimeConversion.DATETIME_CLAMP, **kwargs)

if self.auth_key is not None:
client["admin"].authenticate(self.auth_username, self.auth_key)
return client
Expand Down
14 changes: 11 additions & 3 deletions mongo_connector/oplog_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,14 @@ def _should_skip_entry(self, entry):
)
return True, False

# Within mongo-6 in case of update the op-log does not contain the
# complete document, so we must re-fetch it from database
if entry["op"] == "u" and 'diff' in entry["o"]:
LOG.debug("OplogThread: updating entry '%s' from "
"collection '%s'" % (entry["o2"]["_id"], entry["ns"]))
from_coll = self.get_collection(entry["ns"])
entry["o"] = from_coll.find_one({'_id': entry["o2"]["_id"]})

# Update the namespace.
entry["ns"] = namespace.dest_name

Expand Down Expand Up @@ -545,7 +553,7 @@ def get_all_ns():
if database == "config" or database == "local":
continue
coll_list = retry_until_ok(
self.primary_client[database].collection_names
self.primary_client[database].list_collection_names
)
for coll in coll_list:
# ignore system collections
Expand Down Expand Up @@ -607,7 +615,7 @@ def upsert_each(dm):
for namespace in dump_set:
from_coll = self.get_collection(namespace)
mapped_ns = self.namespace_config.map_namespace(namespace)
total_docs = retry_until_ok(from_coll.count)
total_docs = retry_until_ok(from_coll.count_documents, {})
num = None
for num, doc in enumerate(docs_to_dump(from_coll)):
try:
Expand Down Expand Up @@ -641,7 +649,7 @@ def upsert_all(dm):
try:
for namespace in dump_set:
from_coll = self.get_collection(namespace)
total_docs = retry_until_ok(from_coll.count)
total_docs = retry_until_ok(from_coll.count_documents, {})
mapped_ns = self.namespace_config.map_namespace(namespace)
LOG.info(
"Bulk upserting approximately %d docs from " "collection '%s'",
Expand Down