From f4391a884bb819cdafcc720a4e2d235249d23f95 Mon Sep 17 00:00:00 2001 From: martin Date: Mon, 23 Sep 2024 16:55:21 +0100 Subject: [PATCH] DBZ-8265 fix journal processing loop when we paginate to last receiver before receiver offset reset --- .../journal/retrieve/ReceiverPagination.java | 28 ++--- .../db2/journal/retrieve/RetrieveJournal.java | 1 - .../retrieve/ReceiverPaginationTest.java | 110 ++++++++++++++++++ 3 files changed, 120 insertions(+), 19 deletions(-) diff --git a/journal-parsing/src/main/java/io/debezium/ibmi/db2/journal/retrieve/ReceiverPagination.java b/journal-parsing/src/main/java/io/debezium/ibmi/db2/journal/retrieve/ReceiverPagination.java index 4e8e999..fbc39f4 100644 --- a/journal-parsing/src/main/java/io/debezium/ibmi/db2/journal/retrieve/ReceiverPagination.java +++ b/journal-parsing/src/main/java/io/debezium/ibmi/db2/journal/retrieve/ReceiverPagination.java @@ -166,7 +166,7 @@ static class RangeFinder { public Optional next(DetailedJournalReceiver nextReceiver) { if (found) { - // if the next journal has wrapped use just go to the end + // if the next journal has wrapped use just go to the end of the previous one if (lastReceiver != null && nextReceiver.start().compareTo(lastReceiver.end()) < 0) { // we're at the end and we've processed it move start on to next receiver if (startEqualsEndAndProcessed(startPosition, lastReceiver)) { @@ -174,29 +174,21 @@ public Optional next(DetailedJournalReceiver nextReceiver) { } else { // the only way we can get here is if we have already checked for pagination - // when we found the start so we should never need to paginate - // it is inexpensive and safer to check - final Optional paginated = rangeWithinCurrentPosition(lastReceiver, - startPosition.getOffset()); - if (paginated.isPresent()) { - return paginated; - } return Optional.of(new PositionRange(false, startPosition, new JournalPosition(lastReceiver.end(), lastReceiver.info().receiver()))); } } final Optional r = rangeWithinCurrentPosition(nextReceiver, nextReceiver.start()); - if (r.isPresent()) { - return r; - } - } - if (nextReceiver.isSameReceiver(startPosition)) { - found = true; - final Optional r = rangeWithinCurrentPosition(nextReceiver, startPosition.getOffset()); - if (r.isPresent()) { - return r; - } + lastReceiver = nextReceiver; + return r; + } else { + if(nextReceiver.isSameReceiver(startPosition)) { + found = true; + final Optional r = rangeWithinCurrentPosition(nextReceiver, startPosition.getOffset()); + lastReceiver = nextReceiver; + return r; + } } lastReceiver = nextReceiver; return Optional.empty(); diff --git a/journal-parsing/src/main/java/io/debezium/ibmi/db2/journal/retrieve/RetrieveJournal.java b/journal-parsing/src/main/java/io/debezium/ibmi/db2/journal/retrieve/RetrieveJournal.java index d2c5af8..487dcd7 100644 --- a/journal-parsing/src/main/java/io/debezium/ibmi/db2/journal/retrieve/RetrieveJournal.java +++ b/journal-parsing/src/main/java/io/debezium/ibmi/db2/journal/retrieve/RetrieveJournal.java @@ -83,7 +83,6 @@ public RetrieveJournal(RetrieveConfig config, JournalInfoRetrieval journalRetrie public boolean retrieveJournal(JournalProcessedPosition previousPosition) throws Exception { final PositionRange range = journalReceivers.findRange(config.as400().connection(), previousPosition); - return retrieveJournal(previousPosition, range); } diff --git a/journal-parsing/src/test/java/io/debezium/ibmi/db2/journal/retrieve/ReceiverPaginationTest.java b/journal-parsing/src/test/java/io/debezium/ibmi/db2/journal/retrieve/ReceiverPaginationTest.java index 3306948..6f7c28f 100644 --- a/journal-parsing/src/test/java/io/debezium/ibmi/db2/journal/retrieve/ReceiverPaginationTest.java +++ b/journal-parsing/src/test/java/io/debezium/ibmi/db2/journal/retrieve/ReceiverPaginationTest.java @@ -23,6 +23,8 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.ibm.as400.access.AS400; @@ -503,4 +505,112 @@ void testStartEqualsEndProcessedResetReceiversPaginate() { assertEquals(new JournalPosition(BigInteger.valueOf(5l), j2.info().receiver()), found.get().end()); assertFalse(found.get().startEqualsEnd()); } + + private static final Logger log = LoggerFactory.getLogger(ReceiverPaginationTest.class); + + @Test + void testStopBeforeJournalResetsPaginateOver() { + final ReceiverPagination jreceivers = new ReceiverPagination(journalInfoRetrieval, 20, journalInfo); + final DetailedJournalReceiver j1 = new DetailedJournalReceiver( + new JournalReceiverInfo(new JournalReceiver("j1", "jlib"), new Date(1), + JournalStatus.OnlineSavedDetached, Optional.of(1)), + BigInteger.valueOf(1), BigInteger.valueOf(10), Optional.of(new JournalReceiver("j2", "jlib")), 1, 1); + final DetailedJournalReceiver j2 = new DetailedJournalReceiver( + new JournalReceiverInfo(new JournalReceiver("j2", "jlib"), new Date(2), + JournalStatus.OnlineSavedDetached, Optional.of(1)), + BigInteger.valueOf(11), BigInteger.valueOf(20), Optional.of(new JournalReceiver("j3", "jlib")), 1, 1); + final DetailedJournalReceiver j3 = new DetailedJournalReceiver( + new JournalReceiverInfo(new JournalReceiver("j3", "jlib"), new Date(4), + JournalStatus.OnlineSavedDetached, Optional.of(1)), + BigInteger.valueOf(1), BigInteger.valueOf(100), Optional.empty(), 1, 1); + final List list = List.of(j1, j2, j3); + final JournalProcessedPosition start = new JournalProcessedPosition(BigInteger.valueOf(5), + j1.info().receiver(), Instant.ofEpochSecond(0), true); + final Optional position = jreceivers.findPosition(start, BigInteger.valueOf(16), list, j1); + assertTrue(position.isPresent()); + assertEquals("j2", position.get().end().getReceiver().name()); + assertEquals(j2.end(), position.get().end().getOffset()); + + } + + @Test + void testStopBeforeJournalResetsPaginateExact() { + final ReceiverPagination jreceivers = new ReceiverPagination(journalInfoRetrieval, 20, journalInfo); + final DetailedJournalReceiver j1 = new DetailedJournalReceiver( + new JournalReceiverInfo(new JournalReceiver("j1", "jlib"), new Date(1), + JournalStatus.OnlineSavedDetached, Optional.of(1)), + BigInteger.valueOf(1), BigInteger.valueOf(10), Optional.of(new JournalReceiver("j2", "jlib")), 1, 1); + final DetailedJournalReceiver j2 = new DetailedJournalReceiver( + new JournalReceiverInfo(new JournalReceiver("j2", "jlib"), new Date(2), + JournalStatus.OnlineSavedDetached, Optional.of(1)), + BigInteger.valueOf(11), BigInteger.valueOf(20), Optional.of(new JournalReceiver("j3", "jlib")), 1, 1); + final DetailedJournalReceiver j3 = new DetailedJournalReceiver( + new JournalReceiverInfo(new JournalReceiver("j3", "jlib"), new Date(4), + JournalStatus.OnlineSavedDetached, Optional.of(1)), + BigInteger.valueOf(1), BigInteger.valueOf(100), Optional.empty(), 1, 1); + final List list = List.of(j1, j2, j3); + final JournalProcessedPosition start = new JournalProcessedPosition(BigInteger.valueOf(5), + j1.info().receiver(), Instant.ofEpochSecond(0), true); + final Optional position = jreceivers.findPosition(start, BigInteger.valueOf(15), list, j1); + assertTrue(position.isPresent()); + assertEquals("j2", position.get().end().getReceiver().name()); + assertEquals(j2.end(), position.get().end().getOffset()); + + } + + @Test + void testStopOneBeforeJournalResetsPaginate() { + final ReceiverPagination jreceivers = new ReceiverPagination(journalInfoRetrieval, 20, journalInfo); + final DetailedJournalReceiver j1 = new DetailedJournalReceiver( + new JournalReceiverInfo(new JournalReceiver("j1", "jlib"), new Date(1), + JournalStatus.OnlineSavedDetached, Optional.of(1)), + BigInteger.valueOf(1), BigInteger.valueOf(10), Optional.of(new JournalReceiver("j2", "jlib")), 1, 1); + final DetailedJournalReceiver j2 = new DetailedJournalReceiver( + new JournalReceiverInfo(new JournalReceiver("j2", "jlib"), new Date(2), + JournalStatus.OnlineSavedDetached, Optional.of(1)), + BigInteger.valueOf(11), BigInteger.valueOf(20), Optional.of(new JournalReceiver("j3", "jlib")), 1, 1); + final DetailedJournalReceiver j3 = new DetailedJournalReceiver( + new JournalReceiverInfo(new JournalReceiver("j3", "jlib"), new Date(4), + JournalStatus.OnlineSavedDetached, Optional.of(1)), + BigInteger.valueOf(1), BigInteger.valueOf(100), Optional.empty(), 1, 1); + final List list = List.of(j1, j2, j3); + final JournalProcessedPosition start = new JournalProcessedPosition(BigInteger.valueOf(5), + j1.info().receiver(), Instant.ofEpochSecond(0), true); + final Optional position = jreceivers.findPosition(start, BigInteger.valueOf(14), list, j1); + assertTrue(position.isPresent()); + assertEquals("j2", position.get().end().getReceiver().name()); + assertEquals(j2.end().subtract(BigInteger.ONE), position.get().end().getOffset()); + } + + + @Test + void testSkippingOverEndOfFirst() { + final ReceiverPagination jreceivers = new ReceiverPagination(journalInfoRetrieval, 40, journalInfo); + final DetailedJournalReceiver j0 = new DetailedJournalReceiver( + new JournalReceiverInfo(new JournalReceiver("j0", "jlib"), new Date(1), + JournalStatus.OnlineSavedDetached, Optional.of(1)), + BigInteger.valueOf(1), BigInteger.valueOf(111111), Optional.of(new JournalReceiver("j1", "jlib")), 1, 1); + final DetailedJournalReceiver j1 = new DetailedJournalReceiver( + new JournalReceiverInfo(new JournalReceiver("j1", "jlib"), new Date(1), + JournalStatus.OnlineSavedDetached, Optional.of(1)), + BigInteger.valueOf(1), BigInteger.valueOf(20), Optional.of(new JournalReceiver("j2", "jlib")), 1, 1); + final DetailedJournalReceiver j2 = new DetailedJournalReceiver( + new JournalReceiverInfo(new JournalReceiver("j2", "jlib"), new Date(2), + JournalStatus.OnlineSavedDetached, Optional.of(1)), + BigInteger.valueOf(21), BigInteger.valueOf(30), Optional.of(new JournalReceiver("j3", "jlib")), 1, 1); + final DetailedJournalReceiver j3 = new DetailedJournalReceiver( + new JournalReceiverInfo(new JournalReceiver("j3", "jlib"), new Date(3), + JournalStatus.OnlineSavedDetached, Optional.of(1)), + BigInteger.valueOf(31), BigInteger.valueOf(40), Optional.of(new JournalReceiver("j4", "jlib")), 1, 1); + final List list = List.of(j0, j1, j2, j3); + + final JournalProcessedPosition start = new JournalProcessedPosition(BigInteger.valueOf(111111), + j0.info().receiver(), Instant.ofEpochSecond(10), true); + + final Optional found = jreceivers.findPosition(start, BigInteger.valueOf(40), list, j1); + assertEquals(start, found.get().start()); + assertEquals(new JournalPosition(BigInteger.valueOf(40), j3.info().receiver()), found.get().end()); + } + + }