From de67eb808b4e9838c584b39a63aaf09af7bfec79 Mon Sep 17 00:00:00 2001 From: martin Date: Mon, 23 Sep 2024 16:55:21 +0100 Subject: [PATCH] DBZ-8265 fix journal processing loop when we paginate to last receiver before receiver offset reset --- .../journal/retrieve/ReceiverPagination.java | 9 +-- .../db2/journal/retrieve/RetrieveJournal.java | 1 + .../retrieve/ReceiverPaginationTest.java | 78 +++++++++++++++++++ 3 files changed, 80 insertions(+), 8 deletions(-) diff --git a/journal-parsing/src/main/java/io/debezium/ibmi/db2/journal/retrieve/ReceiverPagination.java b/journal-parsing/src/main/java/io/debezium/ibmi/db2/journal/retrieve/ReceiverPagination.java index 4e8e999..80ab288 100644 --- a/journal-parsing/src/main/java/io/debezium/ibmi/db2/journal/retrieve/ReceiverPagination.java +++ b/journal-parsing/src/main/java/io/debezium/ibmi/db2/journal/retrieve/ReceiverPagination.java @@ -166,7 +166,7 @@ static class RangeFinder { public Optional next(DetailedJournalReceiver nextReceiver) { if (found) { - // if the next journal has wrapped use just go to the end + // if the next journal has wrapped use just go to the end of the previous one if (lastReceiver != null && nextReceiver.start().compareTo(lastReceiver.end()) < 0) { // we're at the end and we've processed it move start on to next receiver if (startEqualsEndAndProcessed(startPosition, lastReceiver)) { @@ -174,13 +174,6 @@ public Optional next(DetailedJournalReceiver nextReceiver) { } else { // the only way we can get here is if we have already checked for pagination - // when we found the start so we should never need to paginate - // it is inexpensive and safer to check - final Optional paginated = rangeWithinCurrentPosition(lastReceiver, - startPosition.getOffset()); - if (paginated.isPresent()) { - return paginated; - } return Optional.of(new PositionRange(false, startPosition, new JournalPosition(lastReceiver.end(), lastReceiver.info().receiver()))); } diff --git a/journal-parsing/src/main/java/io/debezium/ibmi/db2/journal/retrieve/RetrieveJournal.java b/journal-parsing/src/main/java/io/debezium/ibmi/db2/journal/retrieve/RetrieveJournal.java index d2c5af8..1b261b7 100644 --- a/journal-parsing/src/main/java/io/debezium/ibmi/db2/journal/retrieve/RetrieveJournal.java +++ b/journal-parsing/src/main/java/io/debezium/ibmi/db2/journal/retrieve/RetrieveJournal.java @@ -83,6 +83,7 @@ public RetrieveJournal(RetrieveConfig config, JournalInfoRetrieval journalRetrie public boolean retrieveJournal(JournalProcessedPosition previousPosition) throws Exception { final PositionRange range = journalReceivers.findRange(config.as400().connection(), previousPosition); + log.debug("retrieve journal start {} end {}", range.start(), range.end()); return retrieveJournal(previousPosition, range); } diff --git a/journal-parsing/src/test/java/io/debezium/ibmi/db2/journal/retrieve/ReceiverPaginationTest.java b/journal-parsing/src/test/java/io/debezium/ibmi/db2/journal/retrieve/ReceiverPaginationTest.java index 3306948..3bae707 100644 --- a/journal-parsing/src/test/java/io/debezium/ibmi/db2/journal/retrieve/ReceiverPaginationTest.java +++ b/journal-parsing/src/test/java/io/debezium/ibmi/db2/journal/retrieve/ReceiverPaginationTest.java @@ -23,6 +23,8 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.ibm.as400.access.AS400; @@ -503,4 +505,80 @@ void testStartEqualsEndProcessedResetReceiversPaginate() { assertEquals(new JournalPosition(BigInteger.valueOf(5l), j2.info().receiver()), found.get().end()); assertFalse(found.get().startEqualsEnd()); } + + private static final Logger log = LoggerFactory.getLogger(ReceiverPaginationTest.class); + + @Test + void testStopBeforeJournalResetsPaginateOver() { + final ReceiverPagination jreceivers = new ReceiverPagination(journalInfoRetrieval, 20, journalInfo); + final DetailedJournalReceiver j1 = new DetailedJournalReceiver( + new JournalReceiverInfo(new JournalReceiver("j1", "jlib"), new Date(1), + JournalStatus.OnlineSavedDetached, Optional.of(1)), + BigInteger.valueOf(1), BigInteger.valueOf(10), Optional.of(new JournalReceiver("j2", "jlib")), 1, 1); + final DetailedJournalReceiver j2 = new DetailedJournalReceiver( + new JournalReceiverInfo(new JournalReceiver("j2", "jlib"), new Date(2), + JournalStatus.OnlineSavedDetached, Optional.of(1)), + BigInteger.valueOf(11), BigInteger.valueOf(20), Optional.of(new JournalReceiver("j3", "jlib")), 1, 1); + final DetailedJournalReceiver j3 = new DetailedJournalReceiver( + new JournalReceiverInfo(new JournalReceiver("j3", "jlib"), new Date(4), + JournalStatus.OnlineSavedDetached, Optional.of(1)), + BigInteger.valueOf(1), BigInteger.valueOf(100), Optional.empty(), 1, 1); + final List list = List.of(j1, j2, j3); + final JournalProcessedPosition start = new JournalProcessedPosition(BigInteger.valueOf(5), + j1.info().receiver(), Instant.ofEpochSecond(0), true); + final Optional position = jreceivers.findPosition(start, BigInteger.valueOf(16), list, j1); + assertTrue(position.isPresent()); + assertEquals("j2", position.get().end().getReceiver().name()); + assertEquals(j2.end(), position.get().end().getOffset()); + + } + + @Test + void testStopBeforeJournalResetsPaginateExact() { + final ReceiverPagination jreceivers = new ReceiverPagination(journalInfoRetrieval, 20, journalInfo); + final DetailedJournalReceiver j1 = new DetailedJournalReceiver( + new JournalReceiverInfo(new JournalReceiver("j1", "jlib"), new Date(1), + JournalStatus.OnlineSavedDetached, Optional.of(1)), + BigInteger.valueOf(1), BigInteger.valueOf(10), Optional.of(new JournalReceiver("j2", "jlib")), 1, 1); + final DetailedJournalReceiver j2 = new DetailedJournalReceiver( + new JournalReceiverInfo(new JournalReceiver("j2", "jlib"), new Date(2), + JournalStatus.OnlineSavedDetached, Optional.of(1)), + BigInteger.valueOf(11), BigInteger.valueOf(20), Optional.of(new JournalReceiver("j3", "jlib")), 1, 1); + final DetailedJournalReceiver j3 = new DetailedJournalReceiver( + new JournalReceiverInfo(new JournalReceiver("j3", "jlib"), new Date(4), + JournalStatus.OnlineSavedDetached, Optional.of(1)), + BigInteger.valueOf(1), BigInteger.valueOf(100), Optional.empty(), 1, 1); + final List list = List.of(j1, j2, j3); + final JournalProcessedPosition start = new JournalProcessedPosition(BigInteger.valueOf(5), + j1.info().receiver(), Instant.ofEpochSecond(0), true); + final Optional position = jreceivers.findPosition(start, BigInteger.valueOf(15), list, j1); + assertTrue(position.isPresent()); + assertEquals("j2", position.get().end().getReceiver().name()); + assertEquals(j2.end(), position.get().end().getOffset()); + + } + + @Test + void testStopOneBeforeJournalResetsPaginate() { + final ReceiverPagination jreceivers = new ReceiverPagination(journalInfoRetrieval, 20, journalInfo); + final DetailedJournalReceiver j1 = new DetailedJournalReceiver( + new JournalReceiverInfo(new JournalReceiver("j1", "jlib"), new Date(1), + JournalStatus.OnlineSavedDetached, Optional.of(1)), + BigInteger.valueOf(1), BigInteger.valueOf(10), Optional.of(new JournalReceiver("j2", "jlib")), 1, 1); + final DetailedJournalReceiver j2 = new DetailedJournalReceiver( + new JournalReceiverInfo(new JournalReceiver("j2", "jlib"), new Date(2), + JournalStatus.OnlineSavedDetached, Optional.of(1)), + BigInteger.valueOf(11), BigInteger.valueOf(20), Optional.of(new JournalReceiver("j3", "jlib")), 1, 1); + final DetailedJournalReceiver j3 = new DetailedJournalReceiver( + new JournalReceiverInfo(new JournalReceiver("j3", "jlib"), new Date(4), + JournalStatus.OnlineSavedDetached, Optional.of(1)), + BigInteger.valueOf(1), BigInteger.valueOf(100), Optional.empty(), 1, 1); + final List list = List.of(j1, j2, j3); + final JournalProcessedPosition start = new JournalProcessedPosition(BigInteger.valueOf(5), + j1.info().receiver(), Instant.ofEpochSecond(0), true); + final Optional position = jreceivers.findPosition(start, BigInteger.valueOf(14), list, j1); + assertTrue(position.isPresent()); + assertEquals("j2", position.get().end().getReceiver().name()); + assertEquals(j2.end().subtract(BigInteger.ONE), position.get().end().getOffset()); + } }