Skip to content

Commit

Permalink
SOLR-17397: Fix SkipExistingDocumentsProcessor to handle child docume… (
Browse files Browse the repository at this point in the history
#2631)

Properly deal with child documents.
---------

Co-authored-by: Eric Pugh <epugh@opensourceconnections.com>
  • Loading branch information
timatbw and epugh authored Aug 13, 2024
1 parent c7af8b8 commit 8ddc3be
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 22 deletions.
2 changes: 1 addition & 1 deletion solr/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ New Features

Improvements
---------------------
(No changes)
* SOLR-17397: SkipExistingDocumentsProcessor now functions correctly with child documents. (Tim Owens via Eric Pugh)

Optimizations
---------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ boolean isSkipUpdateIfMissing() {
return this.skipUpdateIfMissing;
}

boolean doesDocumentExist(BytesRef indexedDocId) throws IOException {
boolean doesDocumentExist(BytesRef indexedDocId) {
assert null != indexedDocId;

// we don't need any fields populated, we just need to know if the doc is in the tlog...
Expand Down Expand Up @@ -224,6 +224,17 @@ boolean doesDocumentExist(BytesRef indexedDocId) throws IOException {
}
}

boolean doesChildDocumentExist(AddUpdateCommand cmd) throws IOException {
return RealTimeGetComponent.getInputDocument(
core,
core.getLatestSchema().indexableUniqueKey(cmd.getSelfOrNestedDocIdStr()),
cmd.getIndexedId(),
null,
null,
RealTimeGetComponent.Resolution.DOC)
!= null;
}

boolean isLeader(UpdateCommand cmd) {
if ((cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
return false;
Expand All @@ -241,38 +252,46 @@ public void processAdd(AddUpdateCommand cmd) throws IOException {

boolean isUpdate = AtomicUpdateDocumentMerger.isAtomicUpdate(cmd);

// boolean existsByLookup = (RealTimeGetComponent.getInputDocument(core, indexedDocId) !=
// null);
// if (docExists != existsByLookup) {
// log.error("Found docExists {} but existsByLookup {} for doc {}", docExists,
// existsByLookup, indexedDocId.utf8ToString());
// }

if (log.isDebugEnabled()) {
log.debug(
"Document ID {} ... exists already? {} ... isAtomicUpdate? {} ... isLeader? {}",
indexedDocId.utf8ToString(),
cmd.getPrintableId(),
doesDocumentExist(indexedDocId),
isUpdate,
isLeader(cmd));
}

if (skipInsertIfExists && !isUpdate && isLeader(cmd) && doesDocumentExist(indexedDocId)) {
if (log.isDebugEnabled()) {
log.debug("Skipping insert for pre-existing document ID {}", indexedDocId.utf8ToString());
log.debug("Skipping insert for pre-existing document ID {}", cmd.getPrintableId());
}
return;
}

if (skipUpdateIfMissing && isUpdate && isLeader(cmd) && !doesDocumentExist(indexedDocId)) {
if (log.isDebugEnabled()) {
log.debug("Skipping update to non-existent document ID {}", indexedDocId.utf8ToString());
if (skipUpdateIfMissing && isUpdate && isLeader(cmd)) {
if (!cmd.getSelfOrNestedDocIdStr().equals(cmd.getIndexedIdStr())) {
// must be a child document
if (!doesChildDocumentExist(cmd)) {
if (log.isDebugEnabled()) {
log.debug(
"Skipping update to non-existent child document ID {}",
cmd.getSelfOrNestedDocIdStr());
}
return;
}
} else {
// not a child document
if (!doesDocumentExist(indexedDocId)) {
if (log.isDebugEnabled()) {
log.debug("Skipping update to non-existent document ID {}", cmd.getPrintableId());
}
return;
}
}
return;
}

if (log.isDebugEnabled()) {
log.debug("Passing on document ID {}", indexedDocId.utf8ToString());
log.debug("Passing on document ID {}", cmd.getPrintableId());
}

super.processAdd(cmd);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public void testSkippableInsertIsNotSkippedIfNotLeader() throws IOException {
}

@Test
public void testSkippableInsertIsNotSkippedIfSkipInsertsFalse() throws IOException {
public void testSkippableInsertIsNotSkippedIfSkipInsertsIsFalse() throws IOException {
UpdateRequestProcessor next = Mockito.mock(DistributedUpdateProcessor.class);
SkipExistingDocumentsUpdateProcessor processor =
Mockito.spy(new SkipExistingDocumentsUpdateProcessor(defaultRequest, next, false, false));
Expand All @@ -245,7 +245,7 @@ public void testSkippableInsertIsNotSkippedIfSkipInsertsFalse() throws IOExcepti
}

@Test
public void testSkippableInsertIsSkippedIfSkipInsertsTrue() throws IOException {
public void testSkippableInsertIsSkippedIfSkipInsertsIsTrue() throws IOException {
UpdateRequestProcessor next = Mockito.mock(DistributedUpdateProcessor.class);
SkipExistingDocumentsUpdateProcessor processor =
Mockito.spy(new SkipExistingDocumentsUpdateProcessor(defaultRequest, next, true, false));
Expand All @@ -259,7 +259,7 @@ public void testSkippableInsertIsSkippedIfSkipInsertsTrue() throws IOException {
}

@Test
public void testNonSkippableInsertIsNotSkippedIfSkipInsertsTrue() throws IOException {
public void testNonSkippableInsertIsNotSkippedIfSkipInsertsIsTrue() throws IOException {
UpdateRequestProcessor next = Mockito.mock(DistributedUpdateProcessor.class);
SkipExistingDocumentsUpdateProcessor processor =
Mockito.spy(new SkipExistingDocumentsUpdateProcessor(defaultRequest, next, true, false));
Expand Down Expand Up @@ -287,7 +287,7 @@ public void testSkippableUpdateIsNotSkippedIfNotLeader() throws IOException {
}

@Test
public void testSkippableUpdateIsNotSkippedIfSkipUpdatesFalse() throws IOException {
public void testSkippableUpdateIsNotSkippedIfSkipUpdatesIsFalse() throws IOException {
UpdateRequestProcessor next = Mockito.mock(DistributedUpdateProcessor.class);
SkipExistingDocumentsUpdateProcessor processor =
Mockito.spy(new SkipExistingDocumentsUpdateProcessor(defaultRequest, next, false, false));
Expand All @@ -301,7 +301,7 @@ public void testSkippableUpdateIsNotSkippedIfSkipUpdatesFalse() throws IOExcepti
}

@Test
public void testSkippableUpdateIsSkippedIfSkipUpdatesTrue() throws IOException {
public void testSkippableUpdateIsSkippedIfSkipUpdatesIsTrue() throws IOException {
UpdateRequestProcessor next = Mockito.mock(DistributedUpdateProcessor.class);
SkipExistingDocumentsUpdateProcessor processor =
Mockito.spy(new SkipExistingDocumentsUpdateProcessor(defaultRequest, next, false, true));
Expand All @@ -315,7 +315,23 @@ public void testSkippableUpdateIsSkippedIfSkipUpdatesTrue() throws IOException {
}

@Test
public void testNonSkippableUpdateIsNotSkippedIfSkipUpdatesTrue() throws IOException {
public void testSkippableChildDocUpdateIsSkippedIfSkipUpdatesIsTrue() throws IOException {
UpdateRequestProcessor next = Mockito.mock(DistributedUpdateProcessor.class);
SkipExistingDocumentsUpdateProcessor processor =
Mockito.spy(new SkipExistingDocumentsUpdateProcessor(defaultRequest, next, false, true));

AddUpdateCommand cmd = Mockito.spy(createAtomicUpdateCmd(defaultRequest));
doReturn(true).when(processor).isLeader(cmd);
doReturn(false).when(processor).doesChildDocumentExist(cmd);
doReturn("123/child1").when(cmd).getSelfOrNestedDocIdStr();
doReturn("123").when(cmd).getIndexedIdStr();

processor.processAdd(cmd);
verify(next, never()).processAdd(cmd);
}

@Test
public void testNonSkippableUpdateIsNotSkippedIfSkipUpdatesIsTrue() throws IOException {
UpdateRequestProcessor next = Mockito.mock(DistributedUpdateProcessor.class);
SkipExistingDocumentsUpdateProcessor processor =
Mockito.spy(new SkipExistingDocumentsUpdateProcessor(defaultRequest, next, false, true));
Expand All @@ -328,6 +344,22 @@ public void testNonSkippableUpdateIsNotSkippedIfSkipUpdatesTrue() throws IOExcep
verify(next).processAdd(cmd);
}

@Test
public void testNonSkippableChildDocUpdateIsNotSkippedIfSkipUpdatesIsTrue() throws IOException {
UpdateRequestProcessor next = Mockito.mock(DistributedUpdateProcessor.class);
SkipExistingDocumentsUpdateProcessor processor =
Mockito.spy(new SkipExistingDocumentsUpdateProcessor(defaultRequest, next, false, true));

AddUpdateCommand cmd = Mockito.spy(createAtomicUpdateCmd(defaultRequest));
doReturn(true).when(processor).isLeader(cmd);
doReturn(true).when(processor).doesChildDocumentExist(cmd);
doReturn("123/child1").when(cmd).getSelfOrNestedDocIdStr();
doReturn("123").when(cmd).getIndexedIdStr();

processor.processAdd(cmd);
verify(next).processAdd(cmd);
}

private AddUpdateCommand createInsertUpdateCmd(SolrQueryRequest req) {
AddUpdateCommand cmd = new AddUpdateCommand(req);
cmd.setIndexedId(docId);
Expand Down

0 comments on commit 8ddc3be

Please sign in to comment.