From ad09741f1f27f6108418eba86e678379ab39502c Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Thu, 12 Apr 2018 17:46:48 +0200 Subject: [PATCH 1/4] ARUHA-1604: added flushing collected events when reaching stream timeout; --- .../nakadi/webservice/hila/HilaAT.java | 17 ++++++++++++++++ .../subscription/state/PartitionData.java | 6 +++++- .../subscription/state/StreamingState.java | 13 +++++++++--- .../subscription/state/PartitionDataTest.java | 20 +++++++++---------- 4 files changed, 42 insertions(+), 14 deletions(-) diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java index fbe206c818..db3062b53e 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java @@ -101,6 +101,23 @@ public void whenStreamTimeoutReachedPossibleToCommit() throws Exception { Assert.assertEquals(SC_NO_CONTENT, statusCode); } + @Test(timeout = 10000) + public void whenStreamTimeoutReachedThenEventsFlushed() throws Exception { + final TestStreamingClient client = TestStreamingClient + .create(URL, subscription.getId(), + "batch_flush_timeout=600&batch_limit=1000&stream_timeout=2&max_uncommitted_events=3") + .start(); + waitFor(() -> assertThat(client.getSessionId(), not(equalTo(SESSION_ID_UNKNOWN)))); + + publishEvents(eventType.getName(), 4, x -> "{\"foo\":\"bar\"}"); + + // when stream_timeout is reached we should get 2 batches: + // first one containing 3 events, second one with debug message + waitFor(() -> assertThat(client.getBatches(), hasSize(2))); + assertThat(client.getBatches().get(0).getEvents(), hasSize(3)); + assertThat(client.getBatches().get(1).getEvents(), hasSize(0)); + } + @Test(timeout = 30000) public void whenOffsetIsCommittedNextSessionStartsFromNextEventAfterCommitted() throws Exception { // write 4 events to event-type diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/PartitionData.java b/src/main/java/org/zalando/nakadi/service/subscription/state/PartitionData.java index cf06d1637b..ed37c2aa4d 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/PartitionData.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/PartitionData.java @@ -56,12 +56,16 @@ class PartitionData { @Nullable List takeEventsToStream(final long currentTimeMillis, final int batchSize, - final long batchTimeoutMillis) { + final long batchTimeoutMillis, final boolean streamTimeoutReached) { final boolean countReached = (nakadiEvents.size() >= batchSize) && batchSize > 0; final boolean timeReached = (currentTimeMillis - lastSendMillis) >= batchTimeoutMillis; if (countReached || timeReached) { lastSendMillis = currentTimeMillis; return extract(batchSize); + } else if (streamTimeoutReached) { + lastSendMillis = currentTimeMillis; + final List extractedEvents = extract(batchSize); + return extractedEvents.isEmpty() ? null : extractedEvents; } else { return null; } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java index 372b765896..3d10020263 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java @@ -95,9 +95,10 @@ public void onEnter() { scheduleTask(this::checkBatchTimeouts, getParameters().batchTimeoutMillis, TimeUnit.MILLISECONDS); scheduleTask(() -> { + streamToOutput(true); final String debugMessage = "Stream timeout reached"; - this.sendMetadata(debugMessage); - this.shutdownGracefully(debugMessage); + sendMetadata(debugMessage); + shutdownGracefully(debugMessage); }, getParameters().streamTimeoutMillis, TimeUnit.MILLISECONDS); @@ -221,6 +222,10 @@ private void checkBatchTimeouts() { } private void streamToOutput() { + streamToOutput(false); + } + + private void streamToOutput(final boolean streamTimeoutReached) { final long currentTimeMillis = System.currentTimeMillis(); int messagesAllowedToSend = (int) getMessagesAllowedToSend(); final boolean wasCommitted = isEverythingCommitted(); @@ -231,7 +236,8 @@ private void streamToOutput() { while (null != (toSend = e.getValue().takeEventsToStream( currentTimeMillis, Math.min(getParameters().batchLimitEvents, messagesAllowedToSend), - getParameters().batchTimeoutMillis))) { + getParameters().batchTimeoutMillis, + streamTimeoutReached))) { sentSomething |= !toSend.isEmpty(); flushData(e.getKey(), toSend, batchesSent == 0 ? Optional.of("Stream started") : Optional.empty()); this.sentEvents += toSend.size(); @@ -649,6 +655,7 @@ private void removeFromStreaming(final EventTypePartition key) { /** * If stream doesn't have any partitions - start timer that will close this session * in commitTimeout*2 if it doesn't get any partitions during that time + * * @param topology the new topology */ private void trackIdleness(final ZkSubscriptionClient.Topology topology) { diff --git a/src/test/java/org/zalando/nakadi/service/subscription/state/PartitionDataTest.java b/src/test/java/org/zalando/nakadi/service/subscription/state/PartitionDataTest.java index 78f8f1405e..b2c14f5552 100644 --- a/src/test/java/org/zalando/nakadi/service/subscription/state/PartitionDataTest.java +++ b/src/test/java/org/zalando/nakadi/service/subscription/state/PartitionDataTest.java @@ -63,7 +63,7 @@ public void normalOperationShouldNotReconfigureKafkaConsumer() { pd.addEvent(new ConsumedEvent(("test_" + i).getBytes(), createCursor(100L + i + 1))); } // Now say to it that it was sent - pd.takeEventsToStream(currentTimeMillis(), 1000, 0L); + pd.takeEventsToStream(currentTimeMillis(), 1000, 0L, false); assertEquals(100L, pd.getUnconfirmed()); for (long i = 0; i < 10; ++i) { final PartitionData.CommitResult cr = pd.onCommitOffset(createCursor(110L + i * 10L)); @@ -77,14 +77,14 @@ public void normalOperationShouldNotReconfigureKafkaConsumer() { public void keepAliveCountShouldIncreaseOnEachEmptyCall() { final PartitionData pd = new PartitionData(COMP, null, createCursor(100L), System.currentTimeMillis()); for (int i = 0; i < 100; ++i) { - pd.takeEventsToStream(currentTimeMillis(), 10, 0L); + pd.takeEventsToStream(currentTimeMillis(), 10, 0L, false); assertEquals(i + 1, pd.getKeepAliveInARow()); } pd.addEvent(new ConsumedEvent("".getBytes(), createCursor(101L))); assertEquals(100, pd.getKeepAliveInARow()); - pd.takeEventsToStream(currentTimeMillis(), 10, 0L); + pd.takeEventsToStream(currentTimeMillis(), 10, 0L, false); assertEquals(0, pd.getKeepAliveInARow()); - pd.takeEventsToStream(currentTimeMillis(), 10, 0L); + pd.takeEventsToStream(currentTimeMillis(), 10, 0L, false); assertEquals(1, pd.getKeepAliveInARow()); } @@ -97,26 +97,26 @@ public void eventsShouldBeStreamedOnTimeout() { for (int i = 0; i < 100; ++i) { pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i + 100L + 1))); } - List data = pd.takeEventsToStream(currentTime, 1000, timeout); + List data = pd.takeEventsToStream(currentTime, 1000, timeout, false); assertNull(data); assertEquals(0, pd.getKeepAliveInARow()); currentTime += timeout + 1; - data = pd.takeEventsToStream(currentTime, 1000, timeout); + data = pd.takeEventsToStream(currentTime, 1000, timeout, false); assertNotNull(data); assertEquals(100, data.size()); for (int i = 100; i < 200; ++i) { pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i + 100L + 1))); } - data = pd.takeEventsToStream(currentTime, 1000, timeout); + data = pd.takeEventsToStream(currentTime, 1000, timeout, false); assertNull(data); assertEquals(0, pd.getKeepAliveInARow()); currentTime += timeout + 1; - data = pd.takeEventsToStream(currentTime, 1000, timeout); + data = pd.takeEventsToStream(currentTime, 1000, timeout, false); assertNotNull(data); assertEquals(100, data.size()); } @@ -128,8 +128,8 @@ public void eventsShouldBeStreamedOnBatchSize() { for (int i = 0; i < 100; ++i) { pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i + 100L + 1))); } - assertNull(pd.takeEventsToStream(currentTimeMillis(), 1000, timeout)); - final List eventsToStream = pd.takeEventsToStream(currentTimeMillis(), 99, timeout); + assertNull(pd.takeEventsToStream(currentTimeMillis(), 1000, timeout, false)); + final List eventsToStream = pd.takeEventsToStream(currentTimeMillis(), 99, timeout, false); assertNotNull(eventsToStream); assertEquals(99, eventsToStream.size()); } From 09b5e25b544cb9a6cd81ce6aaabe282475109493 Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Thu, 12 Apr 2018 18:03:41 +0200 Subject: [PATCH 2/4] ARUHA-1604: updated changelog; --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 073e07757c..b393699597 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +### Changed +- Added flushing of collected events when reaching stream_timeout in subscription API + ## [2.6.3] - 2018-04-10 ### Fixed From b12d09a626322fc388df93542c6ba3fa39d51314 Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Thu, 12 Apr 2018 18:22:25 +0200 Subject: [PATCH 3/4] ARUHA-1604: extended tests; --- .../subscription/state/PartitionDataTest.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/test/java/org/zalando/nakadi/service/subscription/state/PartitionDataTest.java b/src/test/java/org/zalando/nakadi/service/subscription/state/PartitionDataTest.java index b2c14f5552..af2324ff42 100644 --- a/src/test/java/org/zalando/nakadi/service/subscription/state/PartitionDataTest.java +++ b/src/test/java/org/zalando/nakadi/service/subscription/state/PartitionDataTest.java @@ -133,4 +133,24 @@ public void eventsShouldBeStreamedOnBatchSize() { assertNotNull(eventsToStream); assertEquals(99, eventsToStream.size()); } + + @Test + public void eventsShouldBeStreamedOnStreamTimeout() { + final long timeout = TimeUnit.SECONDS.toMillis(100); + final PartitionData pd = new PartitionData(COMP, null, createCursor(100L), System.currentTimeMillis()); + for (int i = 0; i < 10; ++i) { + pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i))); + } + assertEquals(10, pd.takeEventsToStream(currentTimeMillis(), 100, timeout, true).size()); + } + + @Test + public void noEmptyBatchShouldBeStreamedOnStreamTimeoutWhenNoEvents() { + final long timeout = TimeUnit.SECONDS.toMillis(100); + final PartitionData pd = new PartitionData(COMP, null, createCursor(100L), System.currentTimeMillis()); + for (int i = 0; i < 10; ++i) { + pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i))); + } + assertNull(pd.takeEventsToStream(currentTimeMillis(), 0, timeout, true)); + } } From 0b8a0e19138906dfe173d9af6c1babb12e872ea9 Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Tue, 24 Apr 2018 15:35:03 +0200 Subject: [PATCH 4/4] ARUHA-1604: changed test to fail in old version; --- .../java/org/zalando/nakadi/webservice/hila/HilaAT.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java index db3062b53e..c04ecfaf7b 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java @@ -105,17 +105,18 @@ public void whenStreamTimeoutReachedPossibleToCommit() throws Exception { public void whenStreamTimeoutReachedThenEventsFlushed() throws Exception { final TestStreamingClient client = TestStreamingClient .create(URL, subscription.getId(), - "batch_flush_timeout=600&batch_limit=1000&stream_timeout=2&max_uncommitted_events=3") + "batch_flush_timeout=600&batch_limit=1000&stream_timeout=2&max_uncommitted_events=1000") .start(); waitFor(() -> assertThat(client.getSessionId(), not(equalTo(SESSION_ID_UNKNOWN)))); publishEvents(eventType.getName(), 4, x -> "{\"foo\":\"bar\"}"); // when stream_timeout is reached we should get 2 batches: - // first one containing 3 events, second one with debug message + // first one containing 4 events, second one with debug message waitFor(() -> assertThat(client.getBatches(), hasSize(2))); - assertThat(client.getBatches().get(0).getEvents(), hasSize(3)); + assertThat(client.getBatches().get(0).getEvents(), hasSize(4)); assertThat(client.getBatches().get(1).getEvents(), hasSize(0)); + System.out.println(client.getBatches()); } @Test(timeout = 30000)