Skip to content

Commit

Permalink
Fix long running RangeQueryIntegrationTest.
Browse files Browse the repository at this point in the history
  • Loading branch information
k-raina committed Nov 24, 2024
1 parent 70babd5 commit 39ab973
Showing 1 changed file with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand All @@ -59,6 +60,7 @@
import java.util.function.Supplier;
import java.util.stream.Stream;

import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down Expand Up @@ -158,16 +160,16 @@ public void cleanup() throws InterruptedException {

@ParameterizedTest
@MethodSource("data")
public void testStoreConfig(final StoreType storeType, final boolean enableLogging, final boolean enableCaching, final boolean forward) throws Exception {
public void testStoreConfig(final StoreType storeType, final boolean enableLogging, final boolean enableCaching, final boolean forward, final TestInfo testInfo) throws Exception {
final String appID = safeUniqueTestName(testInfo);
final StreamsBuilder builder = new StreamsBuilder();
final Materialized<String, String, KeyValueStore<Bytes, byte[]>> stateStoreConfig = getStoreConfig(storeType, enableLogging, enableCaching);
builder.table(inputStream, stateStoreConfig);

try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), STREAMS_CONFIG)) {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);

writeInputData();

final ReadOnlyKeyValueStore<String, String> stateStore = IntegrationTestUtils.getStore(1000_000L, TABLE_NAME, kafkaStreams, QueryableStoreTypes.keyValueStore());

// wait for the store to populate
Expand Down

0 comments on commit 39ab973

Please sign in to comment.