Skip to content

Commit

Permalink
GEODE-10417: Fix NullPointerException in WAN replication (#7845)
Browse files Browse the repository at this point in the history
* GEODE-10417: Fix NullPointerException in WAN replication

When the WAN group-transa$ction-events feature is enabled in
a parallel gateway sender, it is possible to get a NullPointerException
when retrieving events from the queue to complete a transaction if
the event in the queue is null.

If this situation is reached then the gateway sender dispatcher will
not dispatch queue events anymore and therefore the WAN replication
will not progress.

This happens because the predicates that check if elements
in the queue contain a transactionId are not protected
against the event being null.

A null check has been added before the predicates are invoked
so that in case of a null event, the predicate is not invoked
and the event is skipped from the checking.

* GEODE-10417: Change assertEquals to assertThat
  • Loading branch information
albertogpz committed Sep 16, 2022
1 parent 3ada8fe commit 8b75180
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,10 @@ public List<Object> getElementsMatching(Predicate matchingPredicate, Predicate e
List<Object> elementsMatching = new ArrayList<>();
for (final Object key : eventSeqNumDeque) {
Object object = optimalGet(key);
if (object == null) {
continue;
}

if (matchingPredicate.test(object)) {
elementsMatching.add(object);
eventSeqNumDeque.remove(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import static org.apache.geode.cache.Region.SEPARATOR;
import static org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -175,22 +174,65 @@ public void testGetElementsMatchingWithParallelGatewaySenderQueuePredicatesAndSo
List<Object> objects = bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate,
isLastEventInTransactionPredicate);

assertEquals(2, objects.size());
assertEquals(objects, Arrays.asList(event1, event3));
assertThat(objects.size()).isEqualTo(2);
assertThat(objects).isEqualTo(Arrays.asList(event1, event3));

objects = bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate,
isLastEventInTransactionPredicate);
assertEquals(1, objects.size());
assertEquals(objects, Arrays.asList(event7));
assertThat(objects.size()).isEqualTo(1);
assertThat(objects).isEqualTo(Arrays.asList(event7));

hasTransactionIdPredicate =
ParallelGatewaySenderQueue.getHasTransactionIdPredicate(tx2);
objects = bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate,
isLastEventInTransactionPredicate);
assertEquals(2, objects.size());
assertEquals(objects, Arrays.asList(event2, event4));
assertThat(objects.size()).isEqualTo(2);
assertThat(objects).isEqualTo(Arrays.asList(event2, event4));
}

@Test
public void testGetElementsMatchingWithParallelGatewaySenderQueuePredicatesObjectReadNullDoesNotThrowException()
throws ForceReattemptException {
ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);

TransactionId tx1 = new TXId(null, 1);
TransactionId tx2 = new TXId(null, 2);
TransactionId tx3 = new TXId(null, 3);

GatewaySenderEventImpl event1 = createMockGatewaySenderEvent(1, tx1, false);
GatewaySenderEventImpl eventNotInTransaction1 = createMockGatewaySenderEvent(2, null, false);
GatewaySenderEventImpl event2 = createMockGatewaySenderEvent(3, tx2, false);
GatewaySenderEventImpl event3 = null; // createMockGatewaySenderEvent(4, tx1, true);
GatewaySenderEventImpl event4 = createMockGatewaySenderEvent(5, tx2, true);
GatewaySenderEventImpl event5 = createMockGatewaySenderEvent(6, tx3, false);
GatewaySenderEventImpl event6 = createMockGatewaySenderEvent(7, tx3, false);
GatewaySenderEventImpl event7 = createMockGatewaySenderEvent(8, tx1, true);

this.bucketRegionQueue
.cleanUpDestroyedTokensAndMarkGIIComplete(InitialImageOperation.GIIStatus.NO_GII);

this.bucketRegionQueue.addToQueue(1L, event1);
this.bucketRegionQueue.addToQueue(2L, eventNotInTransaction1);
this.bucketRegionQueue.addToQueue(3L, event2);
this.bucketRegionQueue.addToQueue(4L, event3);
this.bucketRegionQueue.addToQueue(5L, event4);
this.bucketRegionQueue.addToQueue(6L, event5);
this.bucketRegionQueue.addToQueue(7L, event6);
this.bucketRegionQueue.addToQueue(8L, event7);

Predicate<GatewaySenderEventImpl> hasTransactionIdPredicate =
ParallelGatewaySenderQueue.getHasTransactionIdPredicate(tx1);
Predicate<GatewaySenderEventImpl> isLastEventInTransactionPredicate =
ParallelGatewaySenderQueue.getIsLastEventInTransactionPredicate();
when(bucketRegionQueue.getValueInVMOrDiskWithoutFaultIn(4L)).thenReturn(null);
List<Object> objects = this.bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate,
isLastEventInTransactionPredicate);

assertThat(objects.size()).isEqualTo(2);
assertThat(objects).isEqualTo(Arrays.asList(new Object[] {event1, event7}));
}


@Test
public void testPeekedElementsArePossibleDuplicate()
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package org.apache.geode.internal.cache.wan.serial;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
Expand Down Expand Up @@ -111,9 +110,9 @@ public void peekGetsExtraEventsWhenMustGroupTransactionEventsAndNotAllEventsForT
queue.setGroupTransactionEvents(true);

List<AsyncEvent<?, ?>> peeked = queue.peek(3, 100);
assertEquals(4, peeked.size());
assertThat(peeked.size()).isEqualTo(4);
List<AsyncEvent<?, ?>> peekedAfter = queue.peek(3, 100);
assertEquals(3, peekedAfter.size());
assertThat(peekedAfter.size()).isEqualTo(3);
}

@Test
Expand Down Expand Up @@ -146,7 +145,7 @@ public void peekGetsExtraEventsWhenMustGroupTransactionEventsAndNotAllEventsForT
.when(queue).getElementsMatching(any(), any(), anyLong());

List<AsyncEvent<?, ?>> peeked = queue.peek(-1, 1);
assertEquals(4, peeked.size());
assertThat(peeked.size()).isEqualTo(4);
}

@Test
Expand All @@ -155,11 +154,11 @@ public void peekDoesNotGetExtraEventsWhenNotMustGroupTransactionEventsAndNotAllE
QUEUE_REGION, metaRegionFactory);

List<AsyncEvent<?, ?>> peeked = queue.peek(3, 100);
assertEquals(3, peeked.size());
assertThat(peeked.size()).isEqualTo(3);
List<AsyncEvent<?, ?>> peekedAfter = queue.peek(3, 100);
assertEquals(3, peekedAfter.size());
assertThat(peekedAfter.size()).isEqualTo(3);
peekedAfter = queue.peek(1, 100);
assertEquals(1, peekedAfter.size());
assertThat(peekedAfter.size()).isEqualTo(1);
}

@Test
Expand Down Expand Up @@ -192,7 +191,7 @@ public void peekDoesNotGetExtraEventsWhenNotMustGroupTransactionEventsAndNotAllE
.when(queue).getElementsMatching(any(), any(), anyLong());

List<AsyncEvent<?, ?>> peeked = queue.peek(-1, 1);
assertEquals(3, peeked.size());
assertThat(peeked.size()).isEqualTo(3);
}

@Test
Expand All @@ -216,24 +215,23 @@ public void removeExtraPeekedEventDoesNotRemoveFromExtraPeekedIdsUntilPreviousEv
QUEUE_REGION, metaRegionFactory);
queue.setGroupTransactionEvents(true);
List<AsyncEvent<?, ?>> peeked = queue.peek(3, -1);
assertEquals(4, peeked.size());
assertThat(peeked.size()).isEqualTo(4);
assertThat(queue.getLastPeekedId()).isEqualTo(2);
assertThat(queue.getExtraPeekedIds().contains(5L)).isTrue();

assertThat(queue.getExtraPeekedIds()).contains(5L);

for (Object ignored : peeked) {
queue.remove();
}
assertThat(queue.getExtraPeekedIds().contains(5L)).isTrue();
assertThat(queue.getExtraPeekedIds()).contains(5L);

peeked = queue.peek(3, -1);
assertEquals(3, peeked.size());
assertThat(queue.getExtraPeekedIds().contains(5L)).isTrue();
assertThat(peeked.size()).isEqualTo(3);
assertThat(queue.getExtraPeekedIds()).contains(5L);

for (Object ignored : peeked) {
queue.remove();
}
assertThat(queue.getExtraPeekedIds().contains(5L)).isFalse();
assertThat(queue.getExtraPeekedIds()).doesNotContain(5L);
}

private GatewaySenderEventImpl createMockGatewaySenderEventImpl(int transactionId,
Expand Down

0 comments on commit 8b75180

Please sign in to comment.