Skip to content

Commit

Permalink
Reproduce IndexOutOfBoundsException
Browse files Browse the repository at this point in the history
  • Loading branch information
sudeshwasnik committed Dec 21, 2024
1 parent bf4ff0a commit c9a5732
Showing 1 changed file with 9 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ private void testInsertRowsWithGaps(boolean withSchematization, boolean useSingl
// create tpChannel
SnowflakeSinkService service =
SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config)
.setRecordNumber(1)
.setRecordNumber(4)
.setErrorReporter(new InMemoryKafkaRecordErrorReporter())
.setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition)))
.addTask(testTableName, topicPartition)
Expand All @@ -789,19 +789,21 @@ private void testInsertRowsWithGaps(boolean withSchematization, boolean useSingl
i));
}

service.insert(blankRecords);
TestUtils.assertWithRetry(
() -> service.getOffset(new TopicPartition(topic, PARTITION)) == 2, 20, 5);
// service.insert(blankRecords);
// TestUtils.assertWithRetry(
// () -> service.getOffset(new TopicPartition(topic, PARTITION)) == 2, 20, 5);

// Insert another two records with offset gap that requires evolution: 3, 4
List<SinkRecord> gapRecords = TestUtils.createNativeJsonSinkRecords(2, 3, topic, PARTITION);
List<SinkRecord> gapRecords = TestUtils.createNativeJsonSinkRecords(300, 3, topic, PARTITION);
gapRecords.remove(0);
service.insert(gapRecords);

blankRecords.addAll(gapRecords);
service.insert(blankRecords);

// With schematization, we need to resend a new batch should succeed even if there is an offset
// gap from the previous committed offset
if (withSchematization) {
service.insert(gapRecords);
service.insert(blankRecords);
}

TestUtils.assertWithRetry(
Expand Down

0 comments on commit c9a5732

Please sign in to comment.