Skip to content

Commit

Permalink
DBZ-8265 fix journal processing loop when we paginate to last receive…
Browse files Browse the repository at this point in the history
…r before receiver offset reset
  • Loading branch information
msillence committed Sep 24, 2024
1 parent c3565cd commit 50527fe
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,35 +166,28 @@ static class RangeFinder {

public Optional<PositionRange> next(DetailedJournalReceiver nextReceiver) {
if (found) {
// if the next journal has wrapped use just go to the end
// if the next journal has wrapped use just go to the end of the previous one
if (lastReceiver != null && nextReceiver.start().compareTo(lastReceiver.end()) < 0) {
// we're at the end and we've processed it move start on to next receiver
if (startEqualsEndAndProcessed(startPosition, lastReceiver)) {
startPosition.setPosition(new JournalPosition(nextReceiver.start(), nextReceiver.info().receiver()), false);
}
else {
// the only way we can get here is if we have already checked for pagination
// when we found the start so we should never need to paginate
// it is inexpensive and safer to check
final Optional<PositionRange> paginated = rangeWithinCurrentPosition(lastReceiver,
startPosition.getOffset());
if (paginated.isPresent()) {
return paginated;
}
return Optional.of(new PositionRange(false, startPosition,
new JournalPosition(lastReceiver.end(), lastReceiver.info().receiver())));
}
}

final Optional<PositionRange> r = rangeWithinCurrentPosition(nextReceiver, nextReceiver.start());
if (r.isPresent()) {
return r;
}
lastReceiver = nextReceiver;
return r;
}
if (nextReceiver.isSameReceiver(startPosition)) {
found = true;
final Optional<PositionRange> r = rangeWithinCurrentPosition(nextReceiver, startPosition.getOffset());
if (r.isPresent()) {
else {
if (nextReceiver.isSameReceiver(startPosition)) {
found = true;
final Optional<PositionRange> r = rangeWithinCurrentPosition(nextReceiver, startPosition.getOffset());
lastReceiver = nextReceiver;
return r;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ public RetrieveJournal(RetrieveConfig config, JournalInfoRetrieval journalRetrie
public boolean retrieveJournal(JournalProcessedPosition previousPosition) throws Exception {

final PositionRange range = journalReceivers.findRange(config.as400().connection(), previousPosition);

return retrieveJournal(previousPosition, range);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ibm.as400.access.AS400;

Expand Down Expand Up @@ -503,4 +505,110 @@ void testStartEqualsEndProcessedResetReceiversPaginate() {
assertEquals(new JournalPosition(BigInteger.valueOf(5l), j2.info().receiver()), found.get().end());
assertFalse(found.get().startEqualsEnd());
}

private static final Logger log = LoggerFactory.getLogger(ReceiverPaginationTest.class);

@Test
void testStopBeforeJournalResetsPaginateOver() {
final ReceiverPagination jreceivers = new ReceiverPagination(journalInfoRetrieval, 20, journalInfo);
final DetailedJournalReceiver j1 = new DetailedJournalReceiver(
new JournalReceiverInfo(new JournalReceiver("j1", "jlib"), new Date(1),
JournalStatus.OnlineSavedDetached, Optional.of(1)),
BigInteger.valueOf(1), BigInteger.valueOf(10), Optional.of(new JournalReceiver("j2", "jlib")), 1, 1);
final DetailedJournalReceiver j2 = new DetailedJournalReceiver(
new JournalReceiverInfo(new JournalReceiver("j2", "jlib"), new Date(2),
JournalStatus.OnlineSavedDetached, Optional.of(1)),
BigInteger.valueOf(11), BigInteger.valueOf(20), Optional.of(new JournalReceiver("j3", "jlib")), 1, 1);
final DetailedJournalReceiver j3 = new DetailedJournalReceiver(
new JournalReceiverInfo(new JournalReceiver("j3", "jlib"), new Date(4),
JournalStatus.OnlineSavedDetached, Optional.of(1)),
BigInteger.valueOf(1), BigInteger.valueOf(100), Optional.empty(), 1, 1);
final List<DetailedJournalReceiver> list = List.of(j1, j2, j3);
final JournalProcessedPosition start = new JournalProcessedPosition(BigInteger.valueOf(5),
j1.info().receiver(), Instant.ofEpochSecond(0), true);
final Optional<PositionRange> position = jreceivers.findPosition(start, BigInteger.valueOf(16), list, j1);
assertTrue(position.isPresent());
assertEquals("j2", position.get().end().getReceiver().name());
assertEquals(j2.end(), position.get().end().getOffset());

}

@Test
void testStopBeforeJournalResetsPaginateExact() {
final ReceiverPagination jreceivers = new ReceiverPagination(journalInfoRetrieval, 20, journalInfo);
final DetailedJournalReceiver j1 = new DetailedJournalReceiver(
new JournalReceiverInfo(new JournalReceiver("j1", "jlib"), new Date(1),
JournalStatus.OnlineSavedDetached, Optional.of(1)),
BigInteger.valueOf(1), BigInteger.valueOf(10), Optional.of(new JournalReceiver("j2", "jlib")), 1, 1);
final DetailedJournalReceiver j2 = new DetailedJournalReceiver(
new JournalReceiverInfo(new JournalReceiver("j2", "jlib"), new Date(2),
JournalStatus.OnlineSavedDetached, Optional.of(1)),
BigInteger.valueOf(11), BigInteger.valueOf(20), Optional.of(new JournalReceiver("j3", "jlib")), 1, 1);
final DetailedJournalReceiver j3 = new DetailedJournalReceiver(
new JournalReceiverInfo(new JournalReceiver("j3", "jlib"), new Date(4),
JournalStatus.OnlineSavedDetached, Optional.of(1)),
BigInteger.valueOf(1), BigInteger.valueOf(100), Optional.empty(), 1, 1);
final List<DetailedJournalReceiver> list = List.of(j1, j2, j3);
final JournalProcessedPosition start = new JournalProcessedPosition(BigInteger.valueOf(5),
j1.info().receiver(), Instant.ofEpochSecond(0), true);
final Optional<PositionRange> position = jreceivers.findPosition(start, BigInteger.valueOf(15), list, j1);
assertTrue(position.isPresent());
assertEquals("j2", position.get().end().getReceiver().name());
assertEquals(j2.end(), position.get().end().getOffset());

}

@Test
void testStopOneBeforeJournalResetsPaginate() {
final ReceiverPagination jreceivers = new ReceiverPagination(journalInfoRetrieval, 20, journalInfo);
final DetailedJournalReceiver j1 = new DetailedJournalReceiver(
new JournalReceiverInfo(new JournalReceiver("j1", "jlib"), new Date(1),
JournalStatus.OnlineSavedDetached, Optional.of(1)),
BigInteger.valueOf(1), BigInteger.valueOf(10), Optional.of(new JournalReceiver("j2", "jlib")), 1, 1);
final DetailedJournalReceiver j2 = new DetailedJournalReceiver(
new JournalReceiverInfo(new JournalReceiver("j2", "jlib"), new Date(2),
JournalStatus.OnlineSavedDetached, Optional.of(1)),
BigInteger.valueOf(11), BigInteger.valueOf(20), Optional.of(new JournalReceiver("j3", "jlib")), 1, 1);
final DetailedJournalReceiver j3 = new DetailedJournalReceiver(
new JournalReceiverInfo(new JournalReceiver("j3", "jlib"), new Date(4),
JournalStatus.OnlineSavedDetached, Optional.of(1)),
BigInteger.valueOf(1), BigInteger.valueOf(100), Optional.empty(), 1, 1);
final List<DetailedJournalReceiver> list = List.of(j1, j2, j3);
final JournalProcessedPosition start = new JournalProcessedPosition(BigInteger.valueOf(5),
j1.info().receiver(), Instant.ofEpochSecond(0), true);
final Optional<PositionRange> position = jreceivers.findPosition(start, BigInteger.valueOf(14), list, j1);
assertTrue(position.isPresent());
assertEquals("j2", position.get().end().getReceiver().name());
assertEquals(j2.end().subtract(BigInteger.ONE), position.get().end().getOffset());
}

@Test
void testSkippingOverEndOfFirst() {
final ReceiverPagination jreceivers = new ReceiverPagination(journalInfoRetrieval, 40, journalInfo);
final DetailedJournalReceiver j0 = new DetailedJournalReceiver(
new JournalReceiverInfo(new JournalReceiver("j0", "jlib"), new Date(1),
JournalStatus.OnlineSavedDetached, Optional.of(1)),
BigInteger.valueOf(1), BigInteger.valueOf(111111), Optional.of(new JournalReceiver("j1", "jlib")), 1, 1);
final DetailedJournalReceiver j1 = new DetailedJournalReceiver(
new JournalReceiverInfo(new JournalReceiver("j1", "jlib"), new Date(1),
JournalStatus.OnlineSavedDetached, Optional.of(1)),
BigInteger.valueOf(1), BigInteger.valueOf(20), Optional.of(new JournalReceiver("j2", "jlib")), 1, 1);
final DetailedJournalReceiver j2 = new DetailedJournalReceiver(
new JournalReceiverInfo(new JournalReceiver("j2", "jlib"), new Date(2),
JournalStatus.OnlineSavedDetached, Optional.of(1)),
BigInteger.valueOf(21), BigInteger.valueOf(30), Optional.of(new JournalReceiver("j3", "jlib")), 1, 1);
final DetailedJournalReceiver j3 = new DetailedJournalReceiver(
new JournalReceiverInfo(new JournalReceiver("j3", "jlib"), new Date(3),
JournalStatus.OnlineSavedDetached, Optional.of(1)),
BigInteger.valueOf(31), BigInteger.valueOf(40), Optional.of(new JournalReceiver("j4", "jlib")), 1, 1);
final List<DetailedJournalReceiver> list = List.of(j0, j1, j2, j3);

final JournalProcessedPosition start = new JournalProcessedPosition(BigInteger.valueOf(111111),
j0.info().receiver(), Instant.ofEpochSecond(10), true);

final Optional<PositionRange> found = jreceivers.findPosition(start, BigInteger.valueOf(40), list, j1);
assertEquals(start, found.get().start());
assertEquals(new JournalPosition(BigInteger.valueOf(40), j3.info().receiver()), found.get().end());
}

}

0 comments on commit 50527fe

Please sign in to comment.