Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'origin/master' into ARUHA-1624-out-of-m…
Browse files Browse the repository at this point in the history
…emory

# Conflicts:
#	CHANGELOG.md
  • Loading branch information
v-stepanov committed Apr 25, 2018
2 parents f5a7d68 + f56de8a commit 9a90b69
Show file tree
Hide file tree
Showing 45 changed files with 478 additions and 229 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

## [2.6.4] - 2018-04-25

### Added
- Add optional status to the /subscriptions endpoint

### Fixed
- Fixed commit for subscriptions that use direct assignment of partitions
- Fixed OutOfMemoryError when using huge values for batch_limit and max_uncommitted_events


## [2.6.3] - 2018-04-10

### Fixed
Expand Down
53 changes: 53 additions & 0 deletions docs/_data/nakadi-event-bus-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,11 @@ paths:
required: false
default: 0
minimum: 0
- name: show_status
in: query
description: show subscription status
type: boolean
default: false
responses:
'200':
description: OK
Expand Down Expand Up @@ -2327,6 +2332,13 @@ definitions:
List of cursors to start reading from. This property is required when `read_from` = `cursors`.
The initial cursors should cover all partitions of subscription.
Clients will get events starting from next offset positions.
status:
type: array
description: |
Subscription status. This data is only available when querying the subscriptions endpoint for
status.
items:
$ref: '#/definitions/SubscriptionEventTypeStatus'
required:
- owning_application
- event_types
Expand Down Expand Up @@ -2677,6 +2689,7 @@ definitions:
properties:
partition:
type: string
description: the partition id
state:
type: string
description: |
Expand Down Expand Up @@ -2706,6 +2719,46 @@ definitions:
- event_type
- partitions

SubscriptionEventTypeStatus:
type: object
description: Status of one event-type within a context of subscription
properties:
event_type:
type: string
description: event-type name
partitions:
type: array
description: status of partitions of this event-type
items:
type: object
description: status of partition within a subscription context
properties:
partition:
type: string
description: The partition id
state:
type: string
description: |
The state of this partition in current subscription. Currently following values are possible:
- `unassigned`: the partition is currently not assigned to any client;
- `reassigning`: the partition is currently reasssigning from one client to another;
- `assigned`: the partition is assigned to a client.
stream_id:
type: string
description: the id of the stream that consumes data from this partition
assignment_type:
type: string
description: |
- `direct`: partition can't be transferred to another stream until the stream is closed;
- `auto`: partition can be transferred to another stream in case of rebalance, or if another stream
requests to read from this partition.
required:
- partition
- state
required:
- event_type
- partitions

BatchItemResponse:
description: |
A status corresponding to one individual Event's publishing attempt.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import org.zalando.nakadi.domain.Subscription;
import org.zalando.nakadi.domain.SubscriptionBase;
import org.zalando.nakadi.exceptions.NoSuchSubscriptionException;
import org.zalando.nakadi.exceptions.ServiceUnavailableException;
import org.zalando.nakadi.exceptions.runtime.DuplicatedSubscriptionException;
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
import org.zalando.nakadi.util.HashGenerator;
import org.zalando.nakadi.util.UUIDGenerator;
import org.zalando.nakadi.utils.RandomSubscriptionBuilder;
Expand Down Expand Up @@ -105,7 +105,8 @@ public void whenGetSubscriptionByKeyPropertiesThenOk() throws Exception {
}

@Test
public void whenListSubscriptionsByOwningApplicationAndEventTypeThenOk() throws ServiceUnavailableException {
public void whenListSubscriptionsByOwningApplicationAndEventTypeThenOk()
throws ServiceTemporarilyUnavailableException {

final String owningApp = TestUtils.randomUUID();
final String owningApp2 = TestUtils.randomUUID();
Expand Down Expand Up @@ -133,7 +134,7 @@ public void whenListSubscriptionsByOwningApplicationAndEventTypeThenOk() throws
}

@Test
public void whenListSubscriptionsByMultipleEventTypesThenOk() throws ServiceUnavailableException {
public void whenListSubscriptionsByMultipleEventTypesThenOk() throws ServiceTemporarilyUnavailableException {
final String et1 = TestUtils.randomUUID();
final String et2 = TestUtils.randomUUID();
final String et3 = TestUtils.randomUUID();
Expand All @@ -159,7 +160,7 @@ public void whenListSubscriptionsByMultipleEventTypesThenOk() throws ServiceUnav
}

@Test
public void whenListSubscriptionsLimitAndOffsetAreRespected() throws ServiceUnavailableException {
public void whenListSubscriptionsLimitAndOffsetAreRespected() throws ServiceTemporarilyUnavailableException {
final String owningApp = TestUtils.randomUUID();
final List<Subscription> testSubscriptions = createRandomSubscriptions(10, owningApp);
testSubscriptions.forEach(this::insertSubscriptionToDB);
Expand All @@ -173,7 +174,8 @@ public void whenListSubscriptionsLimitAndOffsetAreRespected() throws ServiceUnav
}

@Test
public void whenDeleteSubscriptionThenOk() throws ServiceUnavailableException, NoSuchSubscriptionException {
public void whenDeleteSubscriptionThenOk()
throws ServiceTemporarilyUnavailableException, NoSuchSubscriptionException {
final Subscription subscription = RandomSubscriptionBuilder.builder().build();
insertSubscriptionToDB(subscription);

Expand All @@ -185,7 +187,8 @@ public void whenDeleteSubscriptionThenOk() throws ServiceUnavailableException, N
}

@Test(expected = NoSuchSubscriptionException.class)
public void whenDeleteNoneExistingConnectionThenNoSuchSubscriptionException() throws ServiceUnavailableException,
public void whenDeleteNoneExistingConnectionThenNoSuchSubscriptionException()
throws ServiceTemporarilyUnavailableException,
NoSuchSubscriptionException {
repository.deleteSubscription("some-dummy-id");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.zalando.nakadi.webservice.hila;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;
import org.springframework.http.HttpStatus;
Expand Down Expand Up @@ -301,6 +303,29 @@ public void whenNoFreeSlotsForAutoClientThenConflict() throws IOException, Inter
waitFor(() -> assertThat(autoClient2.getResponseCode(), is(HttpStatus.CONFLICT.value())));
}

@Test(timeout = 10000)
public void testCommitWhenDirectAssignment() throws Exception {
// connect with 1 stream directly requesting one partition
final TestStreamingClient client = new TestStreamingClient(URL, subscription.getId(), "batch_flush_timeout=1",
Optional.empty(),
Optional.of("{\"partitions\":[" +
"{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"0\"}]}"));
client.start();
// wait for rebalance to finish
waitFor(() -> assertThat(getNumberOfAssignedStreams(subscription.getId()), Matchers.is(1)));
// publish 1 event
publishBusinessEventWithUserDefinedPartition(eventType.getName(), 1, x -> "blah", x -> "0");
// wait for event to come
waitFor(() -> assertThat(client.getBatches(), hasSize(1)));

// commit cursor
final int commitStatusCode = commitCursors(subscription.getId(),
ImmutableList.of(client.getBatches().get(0).getCursor()), client.getSessionId());

// check that we get 204
assertThat(commitStatusCode, is(HttpStatus.NO_CONTENT.value()));
}

public List<SubscriptionCursor> getLastCursorsForPartitions(final TestStreamingClient client,
final Set<String> partitions) {
if (!client.getBatches().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createBusinessEventTypeWithPartitions;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createSubscription;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createSubscriptionForEventType;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createSubscriptionForEventTypeFromBegin;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.publishBusinessEventWithUserDefinedPartition;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.publishEvents;
import static org.zalando.nakadi.webservice.utils.TestStreamingClient.SESSION_ID_UNKNOWN;

public class SubscriptionAT extends BaseAT {
Expand Down Expand Up @@ -368,7 +370,7 @@ public void testDeleteEventTypeRestriction() throws Exception {
}

@Test
public void whenStatsOnNotInitializedSubscriptionThanCorrectResponse() throws IOException {
public void whenStatsOnNotInitializedSubscriptionThenCorrectResponse() throws IOException {
final String et = createEventType().getName();
final Subscription s = createSubscriptionForEventType(et);
final Response response = when().get("/subscriptions/{sid}/stats", s.getId())
Expand All @@ -386,10 +388,63 @@ public void whenStatsOnNotInitializedSubscriptionThanCorrectResponse() throws IO
Assert.assertNotNull(partition.getPartition());
Assert.assertEquals("", partition.getStreamId());
Assert.assertNull(partition.getUnconsumedEvents());
Assert.assertEquals(partition.getState(), "unassigned");
Assert.assertEquals("unassigned", partition.getState());
}
}

@Test
public void whenLightStatsOnNotInitializedSubscriptionThenCorrectResponse() throws IOException {
final String et = createEventType().getName();
final Subscription s = createSubscriptionForEventType(et);
final String owningApplication = s.getOwningApplication();
final Response response = when()
.get("/subscriptions?show_status=true&owning_application=" + owningApplication)
.thenReturn();
final ItemsWrapper<Subscription> subsItems = MAPPER.readValue(response.print(),
new TypeReference<ItemsWrapper<Subscription>>(){});
for (final Subscription subscription: subsItems.getItems()) {
if (subscription.getId().equals(s.getId())) {
Assert.assertNotNull(subscription.getStatus());
Assert.assertEquals("unassigned", subscription.getStatus().get(0).getPartitions().get(0).getState());
Assert.assertEquals("", subscription.getStatus().get(0).getPartitions().get(0).getStreamId());
return;
}
}
Assert.assertTrue(false);
}

@Test
public void whenLightStatsOnActiveSubscriptionThenCorrectRespones() throws IOException {
final String et = createEventType().getName();
final Subscription s = createSubscriptionForEventTypeFromBegin(et);
final String owningApplication = s.getOwningApplication();

publishEvents(et, 15, i -> "{\"foo\":\"bar\"}");

final TestStreamingClient client = TestStreamingClient
.create(URL, s.getId(), "max_uncommitted_events=20")
.start();
waitFor(() -> assertThat(client.getBatches(), hasSize(15)));

final Response response = when()
.get("/subscriptions?show_status=true&owning_application=" + owningApplication)
.thenReturn();
final ItemsWrapper<Subscription> subsItems = MAPPER.readValue(response.print(),
new TypeReference<ItemsWrapper<Subscription>>(){});
for (final Subscription subscription: subsItems.getItems()) {
if (subscription.getId().equals(s.getId())) {
Assert.assertNotNull(subscription.getStatus());
Assert.assertEquals("assigned", subscription.getStatus().get(0).getPartitions().get(0).getState());
Assert.assertEquals(client.getSessionId(),
subscription.getStatus().get(0).getPartitions().get(0).getStreamId());
Assert.assertEquals(SubscriptionEventTypeStats.Partition.AssignmentType.AUTO,
subscription.getStatus().get(0).getPartitions().get(0).getAssignmentType());
return;
}
}
Assert.assertTrue(false);
}

@Test
public void whenStreamDuplicatePartitionsThenUnprocessableEntity() throws IOException {
final String et = createEventType().getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,14 @@ public static Subscription createSubscriptionForEventType(final String eventType
return createSubscription(subscriptionBase);
}

public static Subscription createSubscriptionForEventTypeFromBegin(final String eventType) throws IOException {
final SubscriptionBase subscriptionBase = RandomSubscriptionBuilder.builder()
.withEventType(eventType)
.withStartFrom(SubscriptionBase.InitialPosition.BEGIN)
.buildSubscriptionBase();
return createSubscription(subscriptionBase);
}

public static Subscription createSubscription(final SubscriptionBase subscription) throws IOException {
return createSubscription(given(), subscription);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import org.zalando.nakadi.exceptions.NakadiException;
import org.zalando.nakadi.exceptions.NoSuchEventTypeException;
import org.zalando.nakadi.exceptions.NotFoundException;
import org.zalando.nakadi.exceptions.ServiceUnavailableException;
import org.zalando.nakadi.exceptions.runtime.CursorConversionException;
import org.zalando.nakadi.exceptions.runtime.InvalidCursorOperation;
import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1;
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
import org.zalando.nakadi.repository.EventTypeRepository;
import org.zalando.nakadi.service.AuthorizationValidator;
import org.zalando.nakadi.service.CursorConverter;
Expand Down Expand Up @@ -82,7 +82,7 @@ public ResponseEntity<?> getDistance(@PathVariable("eventTypeName") final String
.convert(eventTypeName, query.getFinalCursor());
final Long distance = cursorOperationsService.calculateDistance(initialCursor, finalCursor);
query.setDistance(distance);
} catch (InternalNakadiException | ServiceUnavailableException e) {
} catch (InternalNakadiException | ServiceTemporarilyUnavailableException e) {
throw new MyNakadiRuntimeException1("problem calculating cursors distance", e);
} catch (final NoSuchEventTypeException e) {
throw new NotFoundException("event type not found", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import org.zalando.nakadi.exceptions.NakadiException;
import org.zalando.nakadi.exceptions.NakadiRuntimeException;
import org.zalando.nakadi.exceptions.NoSuchEventTypeException;
import org.zalando.nakadi.exceptions.ServiceUnavailableException;
import org.zalando.nakadi.exceptions.UnableProcessException;
import org.zalando.nakadi.exceptions.runtime.CursorsAreEmptyException;
import org.zalando.nakadi.exceptions.runtime.FeatureNotAvailableException;
import org.zalando.nakadi.exceptions.runtime.RequestInProgressException;
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
import org.zalando.nakadi.problem.ValidationProblem;
import org.zalando.nakadi.service.CursorConverter;
import org.zalando.nakadi.service.CursorTokenService;
Expand All @@ -47,6 +47,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;
import static org.springframework.http.ResponseEntity.noContent;
import static org.springframework.http.ResponseEntity.ok;
import static org.zalando.nakadi.service.FeatureToggleService.Feature.HIGH_LEVEL_API;
Expand Down Expand Up @@ -121,6 +122,9 @@ public ResponseEntity<?> commitCursors(@PathVariable("subscriptionId") final Str
}
} catch (final NoSuchEventTypeException | InvalidCursorException e) {
return create(Problem.valueOf(UNPROCESSABLE_ENTITY, e.getMessage()), request);
} catch (final ServiceTemporarilyUnavailableException e) {
LOG.error("Failed to commit cursors", e);
return create(Problem.valueOf(SERVICE_UNAVAILABLE, e.getMessage()), request);
} catch (final NakadiException e) {
LOG.error("Failed to commit cursors", e);
return create(e.asProblem(), request);
Expand Down Expand Up @@ -150,7 +154,8 @@ public ResponseEntity<?> resetCursors(

private List<NakadiCursor> convertToNakadiCursors(
final ItemsWrapper<? extends SubscriptionCursorWithoutToken> cursors) throws
InternalNakadiException, NoSuchEventTypeException, ServiceUnavailableException, InvalidCursorException {
InternalNakadiException, NoSuchEventTypeException, ServiceTemporarilyUnavailableException,
InvalidCursorException {
final List<NakadiCursor> nakadiCursors = new ArrayList<>();
for (final SubscriptionCursorWithoutToken cursor : cursors.getItems()) {
nakadiCursors.add(cursorConverter.convert(cursor));
Expand All @@ -169,7 +174,7 @@ public ResponseEntity<Problem> handleInvalidStreamId(final InvalidStreamIdExcept
public ResponseEntity<Problem> handleUnableProcessException(final RuntimeException ex,
final NativeWebRequest request) {
LOG.debug(ex.getMessage(), ex);
return Responses.create(Response.Status.SERVICE_UNAVAILABLE, ex.getMessage(), request);
return Responses.create(SERVICE_UNAVAILABLE, ex.getMessage(), request);
}

@ExceptionHandler(RequestInProgressException.class)
Expand Down
Loading

0 comments on commit 9a90b69

Please sign in to comment.