Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #861 from zalando/light-subscription-stats
Browse files Browse the repository at this point in the history
Implement light subscriptions stats and refactor
  • Loading branch information
v-stepanov authored Apr 25, 2018
2 parents 25e9497 + e06f28d commit f56de8a
Show file tree
Hide file tree
Showing 44 changed files with 450 additions and 229 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

## [2.6.4] - 2018-04-25

### Added
- Add optional status to the /subscriptions endpoint

### Fixed
- Fixed commit for subscriptions that use direct assignment of partitions

Expand Down
53 changes: 53 additions & 0 deletions docs/_data/nakadi-event-bus-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,11 @@ paths:
required: false
default: 0
minimum: 0
- name: show_status
in: query
description: show subscription status
type: boolean
default: false
responses:
'200':
description: OK
Expand Down Expand Up @@ -2327,6 +2332,13 @@ definitions:
List of cursors to start reading from. This property is required when `read_from` = `cursors`.
The initial cursors should cover all partitions of subscription.
Clients will get events starting from next offset positions.
status:
type: array
description: |
Subscription status. This data is only available when querying the subscriptions endpoint for
status.
items:
$ref: '#/definitions/SubscriptionEventTypeStatus'
required:
- owning_application
- event_types
Expand Down Expand Up @@ -2677,6 +2689,7 @@ definitions:
properties:
partition:
type: string
description: the partition id
state:
type: string
description: |
Expand Down Expand Up @@ -2706,6 +2719,46 @@ definitions:
- event_type
- partitions

SubscriptionEventTypeStatus:
type: object
description: Status of one event-type within a context of subscription
properties:
event_type:
type: string
description: event-type name
partitions:
type: array
description: status of partitions of this event-type
items:
type: object
description: status of partition within a subscription context
properties:
partition:
type: string
description: The partition id
state:
type: string
description: |
The state of this partition in current subscription. Currently following values are possible:
- `unassigned`: the partition is currently not assigned to any client;
- `reassigning`: the partition is currently reasssigning from one client to another;
- `assigned`: the partition is assigned to a client.
stream_id:
type: string
description: the id of the stream that consumes data from this partition
assignment_type:
type: string
description: |
- `direct`: partition can't be transferred to another stream until the stream is closed;
- `auto`: partition can be transferred to another stream in case of rebalance, or if another stream
requests to read from this partition.
required:
- partition
- state
required:
- event_type
- partitions

BatchItemResponse:
description: |
A status corresponding to one individual Event's publishing attempt.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import org.zalando.nakadi.domain.Subscription;
import org.zalando.nakadi.domain.SubscriptionBase;
import org.zalando.nakadi.exceptions.NoSuchSubscriptionException;
import org.zalando.nakadi.exceptions.ServiceUnavailableException;
import org.zalando.nakadi.exceptions.runtime.DuplicatedSubscriptionException;
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
import org.zalando.nakadi.util.HashGenerator;
import org.zalando.nakadi.util.UUIDGenerator;
import org.zalando.nakadi.utils.RandomSubscriptionBuilder;
Expand Down Expand Up @@ -105,7 +105,8 @@ public void whenGetSubscriptionByKeyPropertiesThenOk() throws Exception {
}

@Test
public void whenListSubscriptionsByOwningApplicationAndEventTypeThenOk() throws ServiceUnavailableException {
public void whenListSubscriptionsByOwningApplicationAndEventTypeThenOk()
throws ServiceTemporarilyUnavailableException {

final String owningApp = TestUtils.randomUUID();
final String owningApp2 = TestUtils.randomUUID();
Expand Down Expand Up @@ -133,7 +134,7 @@ public void whenListSubscriptionsByOwningApplicationAndEventTypeThenOk() throws
}

@Test
public void whenListSubscriptionsByMultipleEventTypesThenOk() throws ServiceUnavailableException {
public void whenListSubscriptionsByMultipleEventTypesThenOk() throws ServiceTemporarilyUnavailableException {
final String et1 = TestUtils.randomUUID();
final String et2 = TestUtils.randomUUID();
final String et3 = TestUtils.randomUUID();
Expand All @@ -159,7 +160,7 @@ public void whenListSubscriptionsByMultipleEventTypesThenOk() throws ServiceUnav
}

@Test
public void whenListSubscriptionsLimitAndOffsetAreRespected() throws ServiceUnavailableException {
public void whenListSubscriptionsLimitAndOffsetAreRespected() throws ServiceTemporarilyUnavailableException {
final String owningApp = TestUtils.randomUUID();
final List<Subscription> testSubscriptions = createRandomSubscriptions(10, owningApp);
testSubscriptions.forEach(this::insertSubscriptionToDB);
Expand All @@ -173,7 +174,8 @@ public void whenListSubscriptionsLimitAndOffsetAreRespected() throws ServiceUnav
}

@Test
public void whenDeleteSubscriptionThenOk() throws ServiceUnavailableException, NoSuchSubscriptionException {
public void whenDeleteSubscriptionThenOk()
throws ServiceTemporarilyUnavailableException, NoSuchSubscriptionException {
final Subscription subscription = RandomSubscriptionBuilder.builder().build();
insertSubscriptionToDB(subscription);

Expand All @@ -185,7 +187,8 @@ public void whenDeleteSubscriptionThenOk() throws ServiceUnavailableException, N
}

@Test(expected = NoSuchSubscriptionException.class)
public void whenDeleteNoneExistingConnectionThenNoSuchSubscriptionException() throws ServiceUnavailableException,
public void whenDeleteNoneExistingConnectionThenNoSuchSubscriptionException()
throws ServiceTemporarilyUnavailableException,
NoSuchSubscriptionException {
repository.deleteSubscription("some-dummy-id");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createBusinessEventTypeWithPartitions;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createSubscription;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createSubscriptionForEventType;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createSubscriptionForEventTypeFromBegin;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.publishBusinessEventWithUserDefinedPartition;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.publishEvents;
import static org.zalando.nakadi.webservice.utils.TestStreamingClient.SESSION_ID_UNKNOWN;

public class SubscriptionAT extends BaseAT {
Expand Down Expand Up @@ -368,7 +370,7 @@ public void testDeleteEventTypeRestriction() throws Exception {
}

@Test
public void whenStatsOnNotInitializedSubscriptionThanCorrectResponse() throws IOException {
public void whenStatsOnNotInitializedSubscriptionThenCorrectResponse() throws IOException {
final String et = createEventType().getName();
final Subscription s = createSubscriptionForEventType(et);
final Response response = when().get("/subscriptions/{sid}/stats", s.getId())
Expand All @@ -386,10 +388,63 @@ public void whenStatsOnNotInitializedSubscriptionThanCorrectResponse() throws IO
Assert.assertNotNull(partition.getPartition());
Assert.assertEquals("", partition.getStreamId());
Assert.assertNull(partition.getUnconsumedEvents());
Assert.assertEquals(partition.getState(), "unassigned");
Assert.assertEquals("unassigned", partition.getState());
}
}

@Test
public void whenLightStatsOnNotInitializedSubscriptionThenCorrectResponse() throws IOException {
final String et = createEventType().getName();
final Subscription s = createSubscriptionForEventType(et);
final String owningApplication = s.getOwningApplication();
final Response response = when()
.get("/subscriptions?show_status=true&owning_application=" + owningApplication)
.thenReturn();
final ItemsWrapper<Subscription> subsItems = MAPPER.readValue(response.print(),
new TypeReference<ItemsWrapper<Subscription>>(){});
for (final Subscription subscription: subsItems.getItems()) {
if (subscription.getId().equals(s.getId())) {
Assert.assertNotNull(subscription.getStatus());
Assert.assertEquals("unassigned", subscription.getStatus().get(0).getPartitions().get(0).getState());
Assert.assertEquals("", subscription.getStatus().get(0).getPartitions().get(0).getStreamId());
return;
}
}
Assert.assertTrue(false);
}

@Test
public void whenLightStatsOnActiveSubscriptionThenCorrectRespones() throws IOException {
final String et = createEventType().getName();
final Subscription s = createSubscriptionForEventTypeFromBegin(et);
final String owningApplication = s.getOwningApplication();

publishEvents(et, 15, i -> "{\"foo\":\"bar\"}");

final TestStreamingClient client = TestStreamingClient
.create(URL, s.getId(), "max_uncommitted_events=20")
.start();
waitFor(() -> assertThat(client.getBatches(), hasSize(15)));

final Response response = when()
.get("/subscriptions?show_status=true&owning_application=" + owningApplication)
.thenReturn();
final ItemsWrapper<Subscription> subsItems = MAPPER.readValue(response.print(),
new TypeReference<ItemsWrapper<Subscription>>(){});
for (final Subscription subscription: subsItems.getItems()) {
if (subscription.getId().equals(s.getId())) {
Assert.assertNotNull(subscription.getStatus());
Assert.assertEquals("assigned", subscription.getStatus().get(0).getPartitions().get(0).getState());
Assert.assertEquals(client.getSessionId(),
subscription.getStatus().get(0).getPartitions().get(0).getStreamId());
Assert.assertEquals(SubscriptionEventTypeStats.Partition.AssignmentType.AUTO,
subscription.getStatus().get(0).getPartitions().get(0).getAssignmentType());
return;
}
}
Assert.assertTrue(false);
}

@Test
public void whenStreamDuplicatePartitionsThenUnprocessableEntity() throws IOException {
final String et = createEventType().getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,14 @@ public static Subscription createSubscriptionForEventType(final String eventType
return createSubscription(subscriptionBase);
}

public static Subscription createSubscriptionForEventTypeFromBegin(final String eventType) throws IOException {
final SubscriptionBase subscriptionBase = RandomSubscriptionBuilder.builder()
.withEventType(eventType)
.withStartFrom(SubscriptionBase.InitialPosition.BEGIN)
.buildSubscriptionBase();
return createSubscription(subscriptionBase);
}

public static Subscription createSubscription(final SubscriptionBase subscription) throws IOException {
return createSubscription(given(), subscription);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import org.zalando.nakadi.exceptions.NakadiException;
import org.zalando.nakadi.exceptions.NoSuchEventTypeException;
import org.zalando.nakadi.exceptions.NotFoundException;
import org.zalando.nakadi.exceptions.ServiceUnavailableException;
import org.zalando.nakadi.exceptions.runtime.CursorConversionException;
import org.zalando.nakadi.exceptions.runtime.InvalidCursorOperation;
import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1;
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
import org.zalando.nakadi.repository.EventTypeRepository;
import org.zalando.nakadi.service.AuthorizationValidator;
import org.zalando.nakadi.service.CursorConverter;
Expand Down Expand Up @@ -82,7 +82,7 @@ public ResponseEntity<?> getDistance(@PathVariable("eventTypeName") final String
.convert(eventTypeName, query.getFinalCursor());
final Long distance = cursorOperationsService.calculateDistance(initialCursor, finalCursor);
query.setDistance(distance);
} catch (InternalNakadiException | ServiceUnavailableException e) {
} catch (InternalNakadiException | ServiceTemporarilyUnavailableException e) {
throw new MyNakadiRuntimeException1("problem calculating cursors distance", e);
} catch (final NoSuchEventTypeException e) {
throw new NotFoundException("event type not found", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import org.zalando.nakadi.exceptions.NakadiException;
import org.zalando.nakadi.exceptions.NakadiRuntimeException;
import org.zalando.nakadi.exceptions.NoSuchEventTypeException;
import org.zalando.nakadi.exceptions.ServiceUnavailableException;
import org.zalando.nakadi.exceptions.UnableProcessException;
import org.zalando.nakadi.exceptions.runtime.CursorsAreEmptyException;
import org.zalando.nakadi.exceptions.runtime.FeatureNotAvailableException;
import org.zalando.nakadi.exceptions.runtime.RequestInProgressException;
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
import org.zalando.nakadi.problem.ValidationProblem;
import org.zalando.nakadi.service.CursorConverter;
import org.zalando.nakadi.service.CursorTokenService;
Expand All @@ -47,6 +47,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;
import static org.springframework.http.ResponseEntity.noContent;
import static org.springframework.http.ResponseEntity.ok;
import static org.zalando.nakadi.service.FeatureToggleService.Feature.HIGH_LEVEL_API;
Expand Down Expand Up @@ -121,6 +122,9 @@ public ResponseEntity<?> commitCursors(@PathVariable("subscriptionId") final Str
}
} catch (final NoSuchEventTypeException | InvalidCursorException e) {
return create(Problem.valueOf(UNPROCESSABLE_ENTITY, e.getMessage()), request);
} catch (final ServiceTemporarilyUnavailableException e) {
LOG.error("Failed to commit cursors", e);
return create(Problem.valueOf(SERVICE_UNAVAILABLE, e.getMessage()), request);
} catch (final NakadiException e) {
LOG.error("Failed to commit cursors", e);
return create(e.asProblem(), request);
Expand Down Expand Up @@ -150,7 +154,8 @@ public ResponseEntity<?> resetCursors(

private List<NakadiCursor> convertToNakadiCursors(
final ItemsWrapper<? extends SubscriptionCursorWithoutToken> cursors) throws
InternalNakadiException, NoSuchEventTypeException, ServiceUnavailableException, InvalidCursorException {
InternalNakadiException, NoSuchEventTypeException, ServiceTemporarilyUnavailableException,
InvalidCursorException {
final List<NakadiCursor> nakadiCursors = new ArrayList<>();
for (final SubscriptionCursorWithoutToken cursor : cursors.getItems()) {
nakadiCursors.add(cursorConverter.convert(cursor));
Expand All @@ -169,7 +174,7 @@ public ResponseEntity<Problem> handleInvalidStreamId(final InvalidStreamIdExcept
public ResponseEntity<Problem> handleUnableProcessException(final RuntimeException ex,
final NativeWebRequest request) {
LOG.debug(ex.getMessage(), ex);
return Responses.create(Response.Status.SERVICE_UNAVAILABLE, ex.getMessage(), request);
return Responses.create(SERVICE_UNAVAILABLE, ex.getMessage(), request);
}

@ExceptionHandler(RequestInProgressException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.zalando.nakadi.exceptions.NakadiException;
import org.zalando.nakadi.exceptions.NoConnectionSlotsException;
import org.zalando.nakadi.exceptions.NoSuchEventTypeException;
import org.zalando.nakadi.exceptions.ServiceUnavailableException;
import org.zalando.nakadi.exceptions.UnparseableCursorException;
import org.zalando.nakadi.exceptions.runtime.AccessDeniedException;
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
Expand All @@ -50,8 +49,8 @@
import org.zalando.nakadi.service.EventStreamConfig;
import org.zalando.nakadi.service.EventStreamFactory;
import org.zalando.nakadi.service.EventTypeChangeListener;
import org.zalando.nakadi.service.timeline.TimelineService;
import org.zalando.nakadi.service.FeatureToggleService;
import org.zalando.nakadi.service.timeline.TimelineService;
import org.zalando.nakadi.util.FlowIdUtils;
import org.zalando.nakadi.view.Cursor;
import org.zalando.problem.Problem;
Expand All @@ -76,6 +75,7 @@
import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.PRECONDITION_FAILED;
import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;
import static org.zalando.nakadi.metrics.MetricUtils.metricNameFor;
import static org.zalando.nakadi.service.FeatureToggleService.Feature.LIMIT_CONSUMERS_NUMBER;

Expand Down Expand Up @@ -133,7 +133,7 @@ public EventStreamController(final EventTypeRepository eventTypeRepository,

@VisibleForTesting
List<NakadiCursor> getStreamingStart(final EventType eventType, final String cursorsStr)
throws UnparseableCursorException, ServiceUnavailableException, InvalidCursorException,
throws UnparseableCursorException, ServiceTemporarilyUnavailableException, InvalidCursorException,
InternalNakadiException, NoSuchEventTypeException {
List<Cursor> cursors = null;
if (cursorsStr != null) {
Expand Down Expand Up @@ -288,6 +288,9 @@ public StreamingResponseBody streamEvents(
} catch (final NoConnectionSlotsException e) {
LOG.debug("Connection creation failed due to exceeding max connection count");
writeProblemResponse(response, outputStream, e.asProblem());
} catch (final ServiceTemporarilyUnavailableException e) {
LOG.error("Error while trying to stream events.", e);
writeProblemResponse(response, outputStream, SERVICE_UNAVAILABLE, e.getMessage());
} catch (final NakadiException e) {
LOG.error("Error while trying to stream events.", e);
writeProblemResponse(response, outputStream, e.asProblem());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public ResponseEntity<Problem> handleCursorConversionException(final CursorConve
}

@ExceptionHandler(ServiceTemporarilyUnavailableException.class)
public ResponseEntity<Problem> handleServiceTemporaryUnavailableException(
public ResponseEntity<Problem> handleServiceTemporarilyUnavailableException(
final ServiceTemporarilyUnavailableException exception, final NativeWebRequest request) {
LOG.error(exception.getMessage(), exception);
return Responses.create(Response.Status.SERVICE_UNAVAILABLE, exception.getMessage(), request);
Expand Down
Loading

0 comments on commit f56de8a

Please sign in to comment.