From 94639d40bbf793240fa245f7f17526f7ed6560a8 Mon Sep 17 00:00:00 2001 From: Shankari Date: Fri, 3 Jan 2025 18:46:11 -0800 Subject: [PATCH 1/2] Catch pymongo errors, log details and affected document We recently encountered an issue in which `usercache/put` calls were failing because of an error with the message format. It was not easy to determine the object that was generating the error, since we only log successful updates. This change catches all mongo errors and logs the full backtrace, including the document that generated the error. Testing done: - Added additional debugging statements to `sync_phone_to_server` in `emission/net/api/usercache.py` ``` update_query = {'user_id': uuid, 'metadata.type': data["metadata"]["type"], 'metadata.write_ts': data["metadata"]["write_ts"], 'metadata.key': data["metadata"]["key"]} + time.sleep(2) + logging.debug("After sleep, continuing to processing") ``` - Started up a local server, and logged in from the emulator - Started a trip, started location tracking and ended the trip - While the `usercache/put` was processing the entries (slowed down because of the sleep), killed the local DB - put failed with an error message highlighting the document that was being saved (although it did not matter in for this error since it was a connection error and not a document format error) ``` 2025-01-03 12:11:54,717:DEBUG:12994228224:After sleep, continuing to processing 2025-01-03 12:11:54,720:DEBUG:12994228224:Updated result for user = 34da08c9-e7a7-4f91-bf65-bf6b6d970c32, key = stats/client_time, write_ts = 1735933996.229703 = {'n': 1, 'nModified': 0, 'upserted': ObjectId('6778448afacd6df071652448'), 'ok': 1.0, 'updatedExisting': False} 2025-01-03 12:11:56,726:DEBUG:12994228224:After sleep, continuing to processing 2025-01-03 12:11:56,728:DEBUG:12994228224:Updated result for user = 34da08c9-e7a7-4f91-bf65-bf6b6d970c32, key = stats/client_time, write_ts = 1735933996.2422519 = {'n': 1, 'nModified': 0, 'upserted': ObjectId('6778448cfacd6df07165244a'), 'ok': 1.0, 'updatedExisting': False} 2025-01-03 12:11:58,732:DEBUG:12994228224:After sleep, continuing to processing 2025-01-03 12:12:29,131:ERROR:12994228224:In sync_phone_to_server, while executing update_query={'user_id': UUID('34da08c9-e7a7-4f91-bf65-bf6b6d970c32'), 'metadata.type': 'message', 'metadata.write_ts': 1735933996.3793979, 'metadata.key': 'stats/client_time'} on document={'$set': {'data': {'ts': 1735933996.369, 'client_app_version': '1.9.6', 'name': 'onboarding_state', 'client_os_version': '18.1', 'reading': {'route': 1, 'opcode': 'nrelop_dev-emulator-study_default_testdbfail'}}, 'metadata': {'time_zone': 'America/Los_Angeles', 'plugin': 'none', 'write_ts': 1735933996.3793979, 'platform': 'ios', 'read_ts': 0, 'key': 'stats/client_time', 'type': 'message'}, 'user_id': UUID('34da08c9-e7a7-4f91-bf65-bf6b6d970c32')}} 2025-01-03 12:12:29,133:ERROR:12994228224:localhost:27017: [Errno 61] Connection refused, Timeout: 30s, Topology Description: ]> ``` --- emission/net/api/usercache.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/emission/net/api/usercache.py b/emission/net/api/usercache.py index d80cd03df..b2066b243 100644 --- a/emission/net/api/usercache.py +++ b/emission/net/api/usercache.py @@ -51,14 +51,19 @@ def sync_phone_to_server(uuid, data_from_phone): 'metadata.type': data["metadata"]["type"], 'metadata.write_ts': data["metadata"]["write_ts"], 'metadata.key': data["metadata"]["key"]} - result = usercache_db.update_one(update_query, - document, - upsert=True) - logging.debug("Updated result for user = %s, key = %s, write_ts = %s = %s" % - (uuid, data["metadata"]["key"], data["metadata"]["write_ts"], result.raw_result)) + try: + result = usercache_db.update_one(update_query, + document, + upsert=True) + logging.debug("Updated result for user = %s, key = %s, write_ts = %s = %s" % + (uuid, data["metadata"]["key"], data["metadata"]["write_ts"], result.raw_result)) - # I am not sure how to trigger a writer error to test this - # and whether this is the format expected from the server in the rawResult - if 'ok' in result.raw_result and result.raw_result['ok'] != 1.0: - logging.error("In sync_phone_to_server, err = %s" % result.raw_result['writeError']) - raise Exception() + # I am not sure how to trigger a writer error to test this + # and whether this is the format expected from the server in the rawResult + if 'ok' in result.raw_result and result.raw_result['ok'] != 1.0: + logging.error("In sync_phone_to_server, err = %s" % result.raw_result['writeError']) + raise Exception() + except pymongo.errors.PyMongoError as e: + logging.error(f"In sync_phone_to_server, while executing {update_query=} on {document=}") + logging.exception(e) + raise From 1ec706e7e0d51cee3dd5942f87b45f3a9b9dd86d Mon Sep 17 00:00:00 2001 From: Shankari Date: Fri, 3 Jan 2025 21:50:48 -0800 Subject: [PATCH 2/2] Munge incoming entries to avoid keys with dots On around Dec 21st 2024, it looks like firebase changed the format of their push notifications to add in some metadata into the `additionalData` field. This metadata has keys with dots. Since we also use the `additionalData` field to pass in the survey or popup message for custom push notifications, we store the entire `additionalData` into the notification stat. When this notification is pushed up to the server, it cannot be stored in the database, since MongoDB/DocumentDB do not support keys with dots. https://stackoverflow.com/questions/66369545/documentdb-updateone-fails-with-163-name-is-not-valid-for-storage While trying to save the entry, we get the error ``` Traceback (most recent call last): File "/usr/src/app/emission/net/api/bottle.py", line 997, in _handle out = route.call(**args) File "/usr/src/app/emission/net/api/bottle.py", line 1998, in wrapper rv = callback(*a, **ka) File "/usr/src/app/emission/net/api/cfc_webapp.py", line 249, in putIntoCache return usercache.sync_phone_to_server(user_uuid, from_phone) File "/usr/src/app/emission/net/api/usercache.py", line 54, in sync_phone_to_server result = usercache_db.update_one(update_query, File "/root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/pymongo/collection.py", line 1041, in update_one self._update_retryable( File "/root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/pymongo/collection.py", line 836, in _update_retryable return self.__database.client._retryable_write( File "/root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/pymongo/mongo_client.py", line 1476, in _retryable_write return self._retry_with_session(retryable, func, s, None) File "/root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/pymongo/mongo_client.py", line 1349, in _retry_with_session return self._retry_internal(retryable, func, session, bulk) File "/root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/pymongo/_csot.py", line 105, in csot_wrapper return func(self, *args, **kwargs) File "/root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/pymongo/mongo_client.py", line 1390, in _retry_internal return func(session, sock_info, retryable) File "/root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/pymongo/collection.py", line 817, in _update return self._update( File "/root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/pymongo/collection.py", line 782, in _update _check_write_command_response(result) File "/root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/pymongo/helpers.py", line 217, in _check_write_command_response _raise_last_write_error(write_errors) File "/root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/pymongo/helpers.py", line 190, in _raise_last_write_error raise WriteError(error.get("errmsg"), error.get("code"), error) pymongo.errors.WriteError: Name is not valid for storage, full error: {'index': 0, 'code': 163, 'errmsg': 'Name is not valid for storage'} ``` This is bad because this error interrupts the processing of the incoming data, and causes the `/usercache/put` call to fail. The phone keeps trying to upload this data over and over, and failing over and over, so the pipeline never makes progress, and deployers are not able to see newly processed data in their admin dashboards. To fix this, and make the ingestion code more robust in general, we check the incoming data for keys with dots and munge them. This will fix this immediately, and will also ensure that we don't Testing done: - Added a new unit test that invokes the function directly - Added a new integration test that creates entries and calls `sync_phone_to_server` on them Both tests pass --- emission/net/api/usercache.py | 17 +++++ .../TestBuiltinUserCacheHandlerInput.py | 72 +++++++++++++++++++ 2 files changed, 89 insertions(+) diff --git a/emission/net/api/usercache.py b/emission/net/api/usercache.py index b2066b243..c16e62acc 100644 --- a/emission/net/api/usercache.py +++ b/emission/net/api/usercache.py @@ -29,6 +29,19 @@ def sync_server_to_phone(uuid): # logging.debug("retrievedData = %s" % retrievedData) return retrievedData +def _remove_dots(entry_doc): + for key in entry_doc: + # print(f"Checking {key=}") + if isinstance(entry_doc[key], dict): + # print(f"Found dict for {key=}, recursing") + _remove_dots(entry_doc[key]) + if '.' in key: + munged_key = key.replace(".", "_") + logging.info(f"Found {key=} with dot, munged to {munged_key=}") + # Get and delete in one swoop + # https://stackoverflow.com/a/11277439 + entry_doc[munged_key] = entry_doc.pop(key, None) + def sync_phone_to_server(uuid, data_from_phone): """ Puts the blob from the phone into the cache @@ -44,6 +57,10 @@ def sync_phone_to_server(uuid, data_from_phone): if "ts" in data["data"] and ecc.isMillisecs(data["data"]["ts"]): data["data"]["ts"] = old_div(float(data["data"]["ts"]), 1000) + + # mongodb/documentDB don't support field names with `.` + # let's convert them all to `_` + _remove_dots(data) # logging.debug("After updating with UUId, we get %s" % data) document = {'$set': data} diff --git a/emission/tests/netTests/TestBuiltinUserCacheHandlerInput.py b/emission/tests/netTests/TestBuiltinUserCacheHandlerInput.py index 3d024be06..3f504af8e 100644 --- a/emission/tests/netTests/TestBuiltinUserCacheHandlerInput.py +++ b/emission/tests/netTests/TestBuiltinUserCacheHandlerInput.py @@ -14,6 +14,7 @@ import uuid import attrdict as ad import time +import copy import geojson as gj # This change should be removed in the next server update, by which time hopefully the new geojson version will incorporate the long-term fix for their default precision # See - jazzband/geojson#177 @@ -273,6 +274,77 @@ def testTwoLongTermCalls(self): self.assertEqual(edb.get_timeseries_db().estimated_document_count(), 120) self.assertEqual(edb.get_timeseries_error_db().estimated_document_count(), 0) + def testRemoteDots(self): + test_template = {"ts":1735934360.256, + "client_app_version":"1.9.6", + "name":"open_notification", + "client_os_version":"15.5", + "reading":{ + "additionalData":{ + "google.c.sender.id":"FAKE_SENDER_ID", + "coldstart":False, + "notId":"1122334455667788", + "payload":1122334455667788, + "content-available":1, + "foreground":False, + "google.c.fid":"FAKE_FID", + "gcm.message_id":"FAKE_MESSAGE_ID"}}} + test_1 = copy.copy(test_template) + self.assertEqual(len(test_1["reading"]["additionalData"]), 8) + self.assertIn("google.c.sender.id", + test_1["reading"]["additionalData"]) + self.assertIn("google.c.fid", + test_1["reading"]["additionalData"]) + self.assertIn("gcm.message_id", + test_1["reading"]["additionalData"]) + mauc._remove_dots(test_1) + self.assertEqual(len(test_1["reading"]["additionalData"]), 8) + self.assertIn("google_c_sender_id", + test_1["reading"]["additionalData"]) + self.assertIn("google_c_fid", + test_1["reading"]["additionalData"]) + self.assertIn("gcm_message_id", + test_1["reading"]["additionalData"]) + self.assertNotIn("google.c.sender.id", + test_1["reading"]["additionalData"]) + self.assertNotIn("google.c.fid", + test_1["reading"]["additionalData"]) + self.assertNotIn("gcm.message_id", + test_1["reading"]["additionalData"]) + + metadata_template = {'plugin': 'none', + 'write_ts': self.curr_ts - 25, + 'time_zone': u'America/Los_Angeles', + 'platform': u'ios', + 'key': u'stats/client_time', + 'read_ts': self.curr_ts - 27, + 'type': u'message'} + + # there are 30 entries in the setup function + self.assertEqual(len(self.uc1.getMessage()), 30) + + three_entries_with_dots = [] + for i in range(3): + curr_md = copy.copy(metadata_template) + curr_md['write_ts'] = self.curr_ts - 25 + i + three_entries_with_dots.append({ + 'user_id': self.testUserUUID1, + 'data': copy.copy(test_template), + 'metadata': curr_md}) + + print(f"AFTER {[e.get('metadata', None) for e in three_entries_with_dots]}") + + mauc.sync_phone_to_server(self.testUserUUID1, three_entries_with_dots) + # we have munged, so these new entries should also be saved + # and we should have 33 entries in the usercache + self.assertEqual(len(self.uc1.getMessage()), 33) + self.assertEqual(len(list(self.ts1.find_entries())), 0) + enuah.UserCacheHandler.getUserCacheHandler(self.testUserUUID1).moveToLongTerm() + # since they were munged before saving into the usercache, + # there should be no errors while copying to the timeseries + self.assertEqual(len(self.uc1.getMessage()), 0) + self.assertEqual(len(list(self.ts1.find_entries())), 33) + if __name__ == '__main__': import emission.tests.common as etc