Skip to content

Commit 109a1b8

Browse files
authored
feat: Duplicate granules diff index (#499)
* breaking: using latest uds-lib + update docker * feat: use latest uds-lib * fix: in progress updating code to avoid duplication * feat: add test case + fix some typo
1 parent eee9dcf commit 109a1b8

File tree

2 files changed

+325
-14
lines changed

2 files changed

+325
-14
lines changed

cumulus_lambda_functions/lib/uds_db/granules_db_index.py

Lines changed: 67 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -165,16 +165,20 @@ def create_new_index(self, tenant, tenant_venue, es_mapping: dict):
165165
self.__es.swap_index_for_alias(write_perc_alias_name, current_perc_index_name, new_perc_index_name)
166166
try:
167167
self.__es.migrate_index_data(current_perc_index_name, new_perc_index_name)
168-
except Exception as e:
168+
except:
169169
LOGGER.exception(f'failed to migrate index data: {(current_perc_index_name, new_perc_index_name)}')
170170
return
171171

172-
def get_latest_index(self, tenant, tenant_venue):
172+
def get_latest_index_name(self, tenant, tenant_venue):
173173
write_alias_name = f'{DBConstants.granules_write_alias_prefix}_{tenant}_{tenant_venue}'.lower().strip()
174174
write_alias_name = self.__es.get_alias(write_alias_name)
175175
if len(write_alias_name) != 1:
176176
raise ValueError(f'missing index for {tenant}_{tenant_venue}. {write_alias_name}')
177177
latest_index_name = [k for k in write_alias_name.keys()][0]
178+
return latest_index_name
179+
180+
def get_latest_index(self, tenant, tenant_venue):
181+
latest_index_name = self.get_latest_index_name(tenant, tenant_venue)
178182
index_mapping = self.__es.get_index_mapping(latest_index_name)
179183
if index_mapping is None:
180184
raise ValueError(f'missing index: {latest_index_name}')
@@ -214,37 +218,87 @@ def get_entry(self, tenant: str, tenant_venue: str, doc_id: str, ):
214218
raise ValueError(f"no such granule: {doc_id}")
215219
return result
216220

217-
def delete_entry(self, tenant: str, tenant_venue: str, doc_id: str, ):
221+
def __query_by_id_local(self, tenant: str, tenant_venue: str, doc_id: str, ):
218222
read_alias_name = f'{DBConstants.granules_read_alias_prefix}_{tenant}_{tenant_venue}'.lower().strip()
219-
result = self.__es.query({
223+
dsl = {
220224
'size': 9999,
221-
'query': {'term': {'_id': doc_id}}
222-
}, read_alias_name)
223-
if result is None:
224-
raise ValueError(f"no such granule: {doc_id}")
225-
for each_granule in result['hits']['hits']:
225+
'sort': [
226+
{'properties.datetime': {'order': 'desc'}},
227+
{'id': {'order': 'asc'}}
228+
],
229+
'query': {
230+
'term': {'_id': doc_id}
231+
}
232+
}
233+
result = self.__es.query(dsl, read_alias_name)
234+
if result is None or len(result['hits']['hits']) < 1:
235+
return []
236+
return result['hits']['hits']
237+
238+
def __delete_old_entries(self, dsl_result):
239+
for each_granule in dsl_result:
226240
LOGGER.debug(f"deleting {each_granule['_id']} from {each_granule['_index']}")
227241
delete_result = self.__es.delete_by_query({
228242
'query': {'term': {'id': each_granule['_id']}}
229243
}, each_granule['_index'])
230244
LOGGER.debug(f'delete_result: {delete_result}')
231245
if delete_result is None:
232246
raise ValueError(f"error deleting {each_granule}")
247+
return
248+
249+
def delete_entry(self, tenant: str, tenant_venue: str, doc_id: str, ):
250+
result = self.__query_by_id_local(tenant, tenant_venue, doc_id)
251+
if len(result) < 1:
252+
raise ValueError(f"no such granule: {doc_id}")
253+
self.__delete_old_entries(result)
233254
return result
234255

235256
def update_entry(self, tenant: str, tenant_venue: str, json_body: dict, doc_id: str, ):
257+
# find existing doc_id
258+
# if not found, throw error. Cannot update
259+
# if found, check index.
260+
# if latest index, proceed with update
261+
# if older index, proceed with get + delete
262+
# tweak meta locally, and add it.
236263
write_alias_name = f'{DBConstants.granules_write_alias_prefix}_{tenant}_{tenant_venue}'.lower().strip()
237264
json_body['event_time'] = TimeUtils.get_current_unix_milli()
238-
self.__es.update_one(json_body, doc_id, index=write_alias_name) # TODO assuming granule_id is prefixed with collection id
239-
LOGGER.debug(f'custom_metadata indexed')
265+
existing_entries = self.__query_by_id_local(tenant, tenant_venue, doc_id)
266+
if len(existing_entries) < 1:
267+
raise ValueError(f'unable to update {doc_id} as it is not found. ')
268+
latest_index_name = self.get_latest_index_name(tenant, tenant_venue)
269+
existing_entry = existing_entries[0]
270+
if existing_entry['_index'] == latest_index_name:
271+
LOGGER.debug(f'{doc_id} in latest index: {latest_index_name}. continuing with update')
272+
self.__es.update_one(json_body, doc_id, index=write_alias_name) # TODO assuming granule_id is prefixed with collection id
273+
self.__delete_old_entries(existing_entries[1:])
274+
return
275+
LOGGER.debug(f'{doc_id} in older index: {latest_index_name} v. {existing_entry["_index"]}')
276+
new_doc = {**existing_entry['_source'], **json_body}
277+
self.__es.index_one(new_doc, doc_id, index=write_alias_name) # TODO assuming granule_id is prefixed with collection id
278+
self.__delete_old_entries(existing_entries)
240279
return
241280

242281
def add_entry(self, tenant: str, tenant_venue: str, json_body: dict, doc_id: str, ):
282+
# find existing doc_id
283+
# if not found, add it
284+
# if found, and it is in latest index, add it.
285+
# if found, and it is in older index, add current one, and delete the older one.
286+
243287
write_alias_name = f'{DBConstants.granules_write_alias_prefix}_{tenant}_{tenant_venue}'.lower().strip()
244288
json_body['event_time'] = TimeUtils.get_current_unix_milli()
245-
# TODO validate custom metadata vs the latest index to filter extra items
289+
existing_entries = self.__query_by_id_local(tenant, tenant_venue, doc_id)
290+
if len(existing_entries) < 1:
291+
self.__es.index_one(json_body, doc_id, index=write_alias_name) # TODO assuming granule_id is prefixed with collection id
292+
return
293+
latest_index_name = self.get_latest_index_name(tenant, tenant_venue)
294+
existing_entry = existing_entries[0]
295+
if existing_entry['_index'] == latest_index_name:
296+
self.__es.index_one(json_body, doc_id, index=write_alias_name) # TODO assuming granule_id is prefixed with collection id
297+
self.__delete_old_entries(existing_entries[1:])
298+
return
246299
self.__es.index_one(json_body, doc_id, index=write_alias_name) # TODO assuming granule_id is prefixed with collection id
247-
LOGGER.debug(f'custom_metadata indexed')
300+
self.__delete_old_entries(existing_entries)
301+
# TODO validate custom metadata vs the latest index to filter extra items
248302
return
249303

250304
def dsl_search(self, tenant: str, tenant_venue: str, search_dsl: dict):

tests/cumulus_lambda_functions/lib/uds_db/test_granules_db_index.py

Lines changed: 258 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
import json
22
import os
3+
from copy import deepcopy
4+
from time import sleep
35
from unittest import TestCase
46

7+
from cumulus_lambda_functions.lib.uds_db.db_constants import DBConstants
8+
from mdps_ds_lib.lib.aws.es_abstract import ESAbstract
9+
from mdps_ds_lib.lib.aws.es_factory import ESFactory
10+
511
from cumulus_lambda_functions.lib.uds_db.granules_db_index import GranulesDbIndex
612

713

@@ -50,4 +56,255 @@ def test_02(self):
5056
query_result = granules_index.dsl_search(self.tenant, self.tenant_venue, search_dsl)
5157
print(json.dumps(query_result, indent=4))
5258
# self.assertEqual(custom_metadata, expected)
53-
return
59+
return
60+
61+
def test_complete(self):
62+
"""
63+
Steps:
64+
1. Create index 1
65+
2. Add doc 1
66+
3. Add doc 2
67+
4. Create index 2
68+
5. Add doc 3
69+
6. Update doc 1
70+
7. make sure index 1: doc 1 is disappeared
71+
8. Update doc 3
72+
9. make sure index 2 : doc 3 is updated
73+
10. Create index 3
74+
11. Update doc 4
75+
12. It should throw error.
76+
13. Update doc 2. Make sure index 1 : doc 2 is removed
77+
14. Update doc 3. Make sure index 2 : doc 3 is removed
78+
79+
80+
:return:
81+
"""
82+
os.environ['ES_URL'] = 'https://vpc-uds-sbx-cumulus-es-qk73x5h47jwmela5nbwjte4yzq.us-west-2.es.amazonaws.com'
83+
os.environ['ES_PORT'] = '9200'
84+
granules_db_index = GranulesDbIndex()
85+
es: ESAbstract = ESFactory().get_instance('AWS',
86+
index=DBConstants.collections_index,
87+
base_url=os.getenv('ES_URL'),
88+
port=int(os.getenv('ES_PORT', '443'))
89+
)
90+
91+
self.tenant = 'UDS_LOCAL_UNIT_TEST' # 'uds_local_test' # 'uds_sandbox'
92+
self.tenant_venue = 'UNIT' # 'DEV1' # 'dev'
93+
self.collection_name = 'UDS_UNIT_TEST_1'
94+
self.collection_version = '001'
95+
collection_id = f'URN:NASA:UNITY:{self.tenant}:{self.tenant_venue}:{self.collection_name}___{self.collection_version}'
96+
self.custom_metadata_body1 = {
97+
'tag': {'type': 'keyword'},
98+
'c_data1': {'type': 'long'},
99+
}
100+
self.custom_metadata_body2 = {
101+
'tag': {'type': 'keyword'},
102+
'c_data2': {'type': 'long'},
103+
}
104+
self.custom_metadata_body3 = {
105+
'tag': {'type': 'keyword'},
106+
'c_data3': {'type': 'long'},
107+
}
108+
109+
granule_id1 = f'{collection_id}:test_file01'
110+
granule_id2 = f'{collection_id}:test_file02'
111+
granule_id3 = f'{collection_id}:test_file03'
112+
granule_id4 = f'{collection_id}:test_file04'
113+
114+
mock_feature1 = {
115+
"type": "Feature",
116+
"stac_version": "1.0.0",
117+
"id": "URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:KKK-09___001:test_file09",
118+
"properties": {
119+
"datetime": "2024-11-26T23:37:15.288000Z",
120+
"start_datetime": "2016-01-31T18:00:00.009000Z",
121+
"end_datetime": "2016-01-31T19:59:59.991000Z",
122+
"created": "1970-01-01T00:00:00Z",
123+
"updated": "2024-11-26T23:38:01.692000Z",
124+
"status": "completed",
125+
"provider": "unity",
126+
},
127+
"geometry": {
128+
"type": "Point",
129+
"coordinates": [
130+
0.0,
131+
0.0
132+
]
133+
},
134+
"links": [
135+
{
136+
"rel": "collection",
137+
"href": "."
138+
}
139+
],
140+
"assets": {
141+
"test_file09.nc": {
142+
"href": "s3://uds-sbx-cumulus-staging/URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:KKK-09___001/URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:KKK-09___001:test_file09/test_file09.nc",
143+
"title": "test_file09.nc",
144+
"description": "size=0;checksumType=md5;checksum=00000000000000000000000000000000;",
145+
"file:size": 0,
146+
"file:checksum": "00000000000000000000000000000000",
147+
"roles": [
148+
"data"
149+
]
150+
},
151+
"test_file09.nc.cas": {
152+
"href": "s3://uds-sbx-cumulus-staging/URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:KKK-09___001/URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:KKK-09___001:test_file09/test_file09.nc.cas",
153+
"title": "test_file09.nc.cas",
154+
"description": "size=0;checksumType=md5;checksum=00000000000000000000000000000000;",
155+
"file:size": 0,
156+
"file:checksum": "00000000000000000000000000000000",
157+
"roles": [
158+
"metadata"
159+
]
160+
},
161+
"test_file09.nc.stac.json": {
162+
"href": "s3://uds-sbx-cumulus-staging/URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:KKK-09___001/URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:KKK-09___001:test_file09/test_file09.nc.stac.json",
163+
"title": "test_file09.nc.stac.json",
164+
"description": "size=0;checksumType=md5;checksum=00000000000000000000000000000000;",
165+
"file:size": 0,
166+
"file:checksum": "00000000000000000000000000000000",
167+
"roles": [
168+
"metadata"
169+
]
170+
},
171+
"test_file09.cmr.xml": {
172+
"href": "s3://uds-sbx-cumulus-staging/URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:KKK-09___001/URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:KKK-09___001:test_file09/test_file09.cmr.xml",
173+
"title": "test_file09.cmr.xml",
174+
"description": "size=1716;checksumType=md5;checksum=f842ba4e23e76ae81014a01c820b01f7;",
175+
"file:size": 1716,
176+
"file:checksum": "f842ba4e23e76ae81014a01c820b01f7",
177+
"roles": [
178+
"metadata"
179+
]
180+
}
181+
},
182+
"bbox": {
183+
"type": "envelope",
184+
"coordinates": [
185+
[
186+
-180.0,
187+
90.0
188+
],
189+
[
190+
180.0,
191+
-90.0
192+
]
193+
]
194+
},
195+
"stac_extensions": [
196+
"https://stac-extensions.github.io/file/v2.1.0/schema.json"
197+
],
198+
"collection": "URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:KKK-09___001",
199+
"event_time": 1732664287722
200+
}
201+
mock_feature2 = deepcopy(mock_feature1)
202+
mock_feature3 = deepcopy(mock_feature1)
203+
mock_feature4 = deepcopy(mock_feature1)
204+
mock_feature1['id'] = granule_id1
205+
mock_feature2['id'] = granule_id2
206+
mock_feature3['id'] = granule_id3
207+
mock_feature4['id'] = granule_id4
208+
209+
new_index_name1 = f'{DBConstants.granules_index_prefix}_{self.tenant}_{self.tenant_venue}__v01'.lower().strip()
210+
new_index_name2 = f'{DBConstants.granules_index_prefix}_{self.tenant}_{self.tenant_venue}__v02'.lower().strip()
211+
new_index_name3 = f'{DBConstants.granules_index_prefix}_{self.tenant}_{self.tenant_venue}__v03'.lower().strip()
212+
213+
if es.has_index(new_index_name1):
214+
es.delete_index(new_index_name1)
215+
es.delete_index(f'{DBConstants.granules_index_prefix}_{self.tenant}_{self.tenant_venue}_perc__v01'.lower().strip())
216+
if es.has_index(new_index_name2):
217+
es.delete_index(new_index_name2)
218+
es.delete_index(f'{DBConstants.granules_index_prefix}_{self.tenant}_{self.tenant_venue}_perc__v02'.lower().strip())
219+
if es.has_index(new_index_name3):
220+
es.delete_index(new_index_name3)
221+
es.delete_index(f'{DBConstants.granules_index_prefix}_{self.tenant}_{self.tenant_venue}_perc__v03'.lower().strip())
222+
223+
224+
#### index v1 ####
225+
granules_db_index.create_new_index(self.tenant, self.tenant_venue, self.custom_metadata_body1)
226+
sleep(2)
227+
self.assertTrue(es.has_index(new_index_name1), f'missing {new_index_name1}')
228+
229+
granules_db_index.add_entry(self.tenant, self.tenant_venue, mock_feature1, granule_id1)
230+
sleep(2)
231+
check_result = es.query_by_id(granule_id1, new_index_name1)
232+
self.assertTrue(check_result is not None, f'granule_id1 - new_index_name1 {check_result}')
233+
234+
granules_db_index.add_entry(self.tenant, self.tenant_venue, mock_feature2, granule_id2)
235+
sleep(2)
236+
check_result = es.query_by_id(granule_id2, new_index_name1)
237+
self.assertTrue(check_result is not None, f'granule_id2 - new_index_name1 {check_result}')
238+
239+
#### index v2 ####
240+
granules_db_index.create_new_index(self.tenant, self.tenant_venue, self.custom_metadata_body2)
241+
sleep(2)
242+
self.assertTrue(es.has_index(new_index_name2), f'missing {new_index_name2}')
243+
244+
granules_db_index.add_entry(self.tenant, self.tenant_venue, mock_feature3, granule_id3)
245+
sleep(2)
246+
check_result = es.query_by_id(granule_id3, new_index_name2)
247+
self.assertTrue(check_result is not None, f'granule_id3 - new_index_name2 {check_result}')
248+
249+
check_result = es.query_by_id(granule_id3, new_index_name1)
250+
self.assertTrue(check_result is None, f'granule_id3 - new_index_name1 is not None{check_result}')
251+
252+
granules_db_index.update_entry(self.tenant, self.tenant_venue, {'archive_status': 'cnm_s_failed'}, granule_id1)
253+
sleep(2)
254+
check_result = es.query_by_id(granule_id1, new_index_name2)
255+
self.assertTrue(check_result is not None, f'granule_id1 - new_index_name2 {check_result}')
256+
257+
check_result = es.query_by_id(granule_id1, new_index_name1)
258+
self.assertTrue(check_result is None, f'granule_id1 - new_index_name1 is not None{check_result}')
259+
260+
granules_db_index.update_entry(self.tenant, self.tenant_venue, {'archive_status': 'cnm_s_successful'}, granule_id3)
261+
sleep(2)
262+
check_result = es.query_by_id(granule_id3, new_index_name2)
263+
self.assertTrue(check_result is not None, f'granule_id3 - new_index_name2 {check_result}')
264+
265+
check_result = es.query_by_id(granule_id3, new_index_name1)
266+
self.assertTrue(check_result is None, f'granule_id3 - new_index_name1 is not None{check_result}')
267+
268+
#### index v3 ####
269+
granules_db_index.create_new_index(self.tenant, self.tenant_venue, self.custom_metadata_body3)
270+
sleep(2)
271+
self.assertTrue(es.has_index(new_index_name3), f'missing {new_index_name3}')
272+
273+
with self.assertRaises(ValueError) as context:
274+
granules_db_index.update_entry(self.tenant, self.tenant_venue, {'archive_status': 'cnm_s_failed'}, granule_id4)
275+
sleep(2)
276+
self.assertTrue(str(context.exception).startswith('unable to update'))
277+
# TODO check error
278+
granules_db_index.update_entry(self.tenant, self.tenant_venue, {'archive_status': 'cnm_r_failed'}, granule_id2)
279+
sleep(2)
280+
check_result = es.query_by_id(granule_id2, new_index_name3)
281+
self.assertTrue(check_result is not None, f'granule_id2 - new_index_name3 {check_result}')
282+
283+
check_result = es.query_by_id(granule_id2, new_index_name1)
284+
self.assertTrue(check_result is None, f'granule_id2 - new_index_name1 is not None{check_result}')
285+
286+
check_result = es.query_by_id(granule_id2, new_index_name2)
287+
self.assertTrue(check_result is None, f'granule_id2 - new_index_name2 is not None{check_result}')
288+
289+
granules_db_index.update_entry(self.tenant, self.tenant_venue, {'archive_status': 'cnm_s_failed'}, granule_id3)
290+
sleep(2)
291+
check_result = es.query_by_id(granule_id3, new_index_name3)
292+
self.assertTrue(check_result is not None, f'granule_id3 - new_index_name3 {check_result}')
293+
294+
check_result = es.query_by_id(granule_id3, new_index_name1)
295+
self.assertTrue(check_result is None, f'granule_id3 - new_index_name1 is not None{check_result}')
296+
297+
check_result = es.query_by_id(granule_id3, new_index_name2)
298+
self.assertTrue(check_result is None, f'granule_id3 - new_index_name2 is not None{check_result}')
299+
300+
if es.has_index(new_index_name1):
301+
es.delete_index(new_index_name1)
302+
es.delete_index(f'{DBConstants.granules_index_prefix}_{self.tenant}_{self.tenant_venue}_perc__v01'.lower().strip())
303+
if es.has_index(new_index_name2):
304+
es.delete_index(new_index_name2)
305+
es.delete_index(f'{DBConstants.granules_index_prefix}_{self.tenant}_{self.tenant_venue}_perc__v02'.lower().strip())
306+
if es.has_index(new_index_name3):
307+
es.delete_index(new_index_name3)
308+
es.delete_index(f'{DBConstants.granules_index_prefix}_{self.tenant}_{self.tenant_venue}_perc__v03'.lower().strip())
309+
310+
return

0 commit comments

Comments
 (0)