Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into feature/improved-shut…
Browse files Browse the repository at this point in the history
…down-handling

# Conflicts:
#	src/main/java/de/telekom/horizon/pulsar/service/SseService.java
  • Loading branch information
mherwig committed Dec 2, 2024
2 parents b5dd931 + f5c7b16 commit 7af9d83
Show file tree
Hide file tree
Showing 16 changed files with 587 additions and 114 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
fabric8Version=5.12.4
horizonParentVersion=4.1.0
horizonParentVersion=4.4.0
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
package de.telekom.horizon.pulsar.api;

import de.telekom.eni.pandora.horizon.model.common.ProblemMessage;
import de.telekom.horizon.pulsar.exception.ConnectionCutOutException;
import de.telekom.horizon.pulsar.exception.ConnectionTimeoutException;
import de.telekom.horizon.pulsar.exception.QueueWaitTimeoutException;
import de.telekom.horizon.pulsar.exception.SubscriberDoesNotMatchSubscriptionException;
import de.telekom.horizon.pulsar.exception.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.catalina.connector.ClientAbortException;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -64,6 +61,7 @@ public RestResponseEntityExceptionHandler(ApplicationContext applicationContext)
*/
@ExceptionHandler(value = {
ConnectionCutOutException.class,
StreamLimitExceededException.class
})
@ResponseStatus(HttpStatus.OK)
protected ResponseEntity<Object> handleCutOut(Exception e, WebRequest request) {
Expand Down
34 changes: 29 additions & 5 deletions src/main/java/de/telekom/horizon/pulsar/api/SseController.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
package de.telekom.horizon.pulsar.api;

import de.telekom.horizon.pulsar.exception.SubscriberDoesNotMatchSubscriptionException;
import de.telekom.horizon.pulsar.helper.StreamLimit;
import de.telekom.horizon.pulsar.service.SseService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
Expand Down Expand Up @@ -56,17 +58,24 @@ public ResponseEntity<Void> headRequest(@PathVariable String environment) {
/**
* Retrieves SSE stream for the specified subscriptionId.
*
* @param environment The environment path variable.
* @param subscriptionId The subscriptionId path variable.
* @param environment The environment path variable.
* @param subscriptionId The subscriptionId path variable.
* @param includeHttpHeaders Whether to include HTTP headers in the response.
* @param accept The value of the "Accept" header in the request.
* @param maxNumber Whether to terminate after a certain number of consumed events.
* @param maxMinutes Whether to terminate after a certain time (in minutes).
* @param maxBytes Whether to terminate after a certain number of bytes consumed.
* @param accept The value of the "Accept" header in the request.
* @return A response containing a {@code ResponseBodyEmitter} for SSE streaming.
* @throws SubscriberDoesNotMatchSubscriptionException If the subscriber does not match the specified subscription.
*/
@GetMapping(value = "/sse/{subscriptionId}", produces = {MediaType.ALL_VALUE, APPLICATION_STREAM_JSON_VALUE, MediaType.TEXT_EVENT_STREAM_VALUE})
public ResponseEntity<ResponseBodyEmitter> getSseStream(@PathVariable String environment,
@PathVariable String subscriptionId,
@RequestParam(defaultValue = "false") boolean includeHttpHeaders,
@RequestParam(defaultValue = "0") int maxNumber,
@RequestParam(defaultValue = "0") int maxMinutes,
@RequestParam(defaultValue = "0") int maxBytes,
@RequestHeader(value = "Last-Event-ID", required = false) String offset,
@RequestHeader(value = HttpHeaders.ACCEPT, required = false) String accept) throws SubscriberDoesNotMatchSubscriptionException {

sseService.validateSubscriberIdForSubscription(environment, subscriptionId);
Expand All @@ -76,14 +85,29 @@ public ResponseEntity<ResponseBodyEmitter> getSseStream(@PathVariable String env
accept = APPLICATION_STREAM_JSON_VALUE;
}

var responseContainer = sseService.startEmittingEvents(environment, subscriptionId, accept, includeHttpHeaders);
var responseContainer = sseService.startEmittingEvents(environment, subscriptionId, accept, StringUtils.isNotEmpty(offset) || includeHttpHeaders, offset, StreamLimit.of(maxNumber, maxMinutes, maxBytes));

var responseHeaders = new HttpHeaders();
responseHeaders.add(HttpHeaders.CONTENT_TYPE, accept);
responseHeaders.add(HttpHeaders.TRANSFER_ENCODING, "chunked");
responseHeaders.add(HttpHeaders.CACHE_CONTROL, "no-cache");
responseHeaders.add("X-Accel-Buffering", "no");

return new ResponseEntity<>(responseContainer.getEmitter(), responseHeaders, HttpStatus.OK);
}

/**
* Stops an active SSE stream for the specified subscriptionId.
*
* @param environment The environment path variable.
* @param subscriptionId The subscriptionId path variable.
* @throws SubscriberDoesNotMatchSubscriptionException If the subscriber does not match the specified subscription.
*/
@PostMapping(value = "/sse/{subscriptionId}/terminate", produces = {MediaType.ALL_VALUE, APPLICATION_STREAM_JSON_VALUE, MediaType.TEXT_EVENT_STREAM_VALUE})
public ResponseEntity<Void> terminateSseStream(@PathVariable String environment, @PathVariable String subscriptionId) throws SubscriberDoesNotMatchSubscriptionException {

sseService.validateSubscriberIdForSubscription(environment, subscriptionId);
sseService.stopEmittingEvents(subscriptionId);

return ResponseEntity.status(HttpStatus.OK).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright 2024 Deutsche Telekom IT GmbH
//
// SPDX-License-Identifier: Apache-2.0

package de.telekom.horizon.pulsar.exception;

public class StreamLimitExceededException extends HorizonPulsarException {
public StreamLimitExceededException() {
super();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* Context class representing the context of an event message in a subscription.
*
* This class encapsulates information related to a subscription event message, including
* whether to include HTTP headers, and optional tracing components such as a span and span in scope.
* whether to include HTTP headers or stream limits, and optional tracing components such as a span and span in scope.
* It provides a method to finish the span and span in scope if they are present.
*/

Expand All @@ -28,6 +28,11 @@ public class EventMessageContext {
private SubscriptionEventMessage subscriptionEventMessage;
@Getter
private Boolean includeHttpHeaders;
@Getter
private StreamLimit streamLimit;
@Getter
private boolean ignoreDeduplication;

private Span span;
private Tracer.SpanInScope spanInScope;
public void finishSpan() {
Expand Down
33 changes: 33 additions & 0 deletions src/main/java/de/telekom/horizon/pulsar/helper/StreamLimit.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 Deutsche Telekom IT GmbH
//
// SPDX-License-Identifier: Apache-2.0

package de.telekom.horizon.pulsar.helper;


import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

/**
* Class that contains any streaming limits provided by the customer.
*
* This class encapsulates all possible streaming limits that have been provided by the customer when
* requesting a new stream. The streaming limits will ensure that a active stream terminates early on when exceeded.
* Currently, a customer can specify that the stream should terminate when a specific number of events have been consumed
* or after a certain time (in minutes) or after exceeding a certain number of bytes which have been consumed.
*/
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class StreamLimit {
private int maxNumber;
private int maxMinutes;
private long maxBytes;

public static StreamLimit of(int maxNumber, int maxMinutes, int maxBytes) {
return new StreamLimit(maxNumber, maxMinutes, maxBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,27 @@
import de.telekom.eni.pandora.horizon.model.event.StatusMessage;
import de.telekom.eni.pandora.horizon.model.event.SubscriptionEventMessage;
import de.telekom.eni.pandora.horizon.model.meta.EventRetentionTime;
import de.telekom.eni.pandora.horizon.mongo.model.MessageStateMongoDocument;
import de.telekom.eni.pandora.horizon.mongo.repository.MessageStateMongoRepo;
import de.telekom.eni.pandora.horizon.tracing.HorizonTracer;
import de.telekom.horizon.pulsar.config.PulsarConfig;
import de.telekom.horizon.pulsar.exception.CouldNotFindEventMessageException;
import de.telekom.horizon.pulsar.exception.CouldNotPickMessageException;
import de.telekom.horizon.pulsar.exception.SubscriberDoesNotMatchSubscriptionException;
import de.telekom.horizon.pulsar.helper.EventMessageContext;
import de.telekom.horizon.pulsar.helper.StreamLimit;
import de.telekom.horizon.pulsar.utils.KafkaPicker;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -54,22 +58,26 @@ public class EventMessageSupplier implements Supplier<EventMessageContext> {
@Getter
private final String subscriptionId;
private final Boolean includeHttpHeaders;
private final StreamLimit streamLimit;
private final KafkaPicker kafkaPicker;
private final EventWriter eventWriter;
private final MessageStateMongoRepo messageStateMongoRepo;
private final HorizonTracer tracingHelper;
private final ConcurrentLinkedQueue<State> messageStates = new ConcurrentLinkedQueue<>();
private Instant lastPoll;
private String currentOffset;
private final ObjectMapper objectMapper = new ObjectMapper();

/**
* Constructs an instance of {@code EventMessageSupplier}.
*
* @param subscriptionId The subscriptionId for which messages are fetched.
* @param factory The {@link SseTaskFactory} used for obtaining related components.
* @param subscriptionId The subscriptionId for which messages are fetched.
* @param factory The {@link SseTaskFactory} used for obtaining related components.
* @param includeHttpHeaders Boolean flag indicating whether to include HTTP headers in the generated {@code EventMessageContext}.
* @param startingOffset Enables offset based streaming. Specifies the offset (message id) of the last received event message.
* @param streamLimit The {@link StreamLimit} represents any customer specific conditions for terminating the stream early.
*/
public EventMessageSupplier(String subscriptionId, SseTaskFactory factory, boolean includeHttpHeaders) {
public EventMessageSupplier(String subscriptionId, SseTaskFactory factory, boolean includeHttpHeaders, String startingOffset, StreamLimit streamLimit) {
this.subscriptionId = subscriptionId;

this.pulsarConfig = factory.getPulsarConfig();
Expand All @@ -78,6 +86,8 @@ public EventMessageSupplier(String subscriptionId, SseTaskFactory factory, boole
this.eventWriter = factory.getEventWriter();
this.tracingHelper = factory.getTracingHelper();
this.includeHttpHeaders = includeHttpHeaders;
this.currentOffset = startingOffset;
this.streamLimit = streamLimit;
}

/**
Expand All @@ -95,6 +105,7 @@ public EventMessageContext get() {

if (!messageStates.isEmpty()) {
var state = messageStates.poll();
var ignoreDeduplication = StringUtils.isNotEmpty(currentOffset);

// TODO: these spans get duplicated cause of the vortex latency - will be resolved DHEI-13764

Expand All @@ -117,12 +128,13 @@ public EventMessageContext get() {
var errorMessage = String.format("Event message %s did not match subscriptionId %s", state.getUuid(), state.getSubscriptionId());
throw new SubscriberDoesNotMatchSubscriptionException(errorMessage);
}
}

return new EventMessageContext(message, includeHttpHeaders, span, spanInScope);
Optional.ofNullable(message.getHttpHeaders()).ifPresent(headers -> headers.put("x-pubsub-offset-id", new ArrayList<>(List.of(state.getUuid()))));
}
return new EventMessageContext(message, includeHttpHeaders, streamLimit, ignoreDeduplication, span, spanInScope);
} catch (CouldNotPickMessageException | SubscriberDoesNotMatchSubscriptionException e) {
handleException(state, e);
return new EventMessageContext(null, includeHttpHeaders, span, spanInScope);
return new EventMessageContext(null, includeHttpHeaders, streamLimit, ignoreDeduplication, span, spanInScope);
} finally {
pickSpan.finish();
}
Expand Down Expand Up @@ -168,16 +180,42 @@ private void pollMessageStates() {

Pageable pageable = PageRequest.of(0, pulsarConfig.getSseBatchSize(), Sort.by(Sort.Direction.ASC, "timestamp"));

var list = messageStateMongoRepo.findByStatusInAndDeliveryTypeAndSubscriptionIdAsc(
List.of(Status.PROCESSED),
DeliveryType.SERVER_SENT_EVENT,
subscriptionId,
pageable
).stream()
.filter(m -> m.getCoordinates() != null) // we skip messages that refer to -1 partitions and offsets
.toList();
Optional<MessageStateMongoDocument> offsetMsg = Optional.empty();
if (StringUtils.isNoneEmpty(currentOffset)) {
offsetMsg = messageStateMongoRepo.findById(currentOffset);
}

if (offsetMsg.isPresent()) {
var offsetTimestamp = offsetMsg.get().getTimestamp();

var list = messageStateMongoRepo.findByDeliveryTypeAndSubscriptionIdAndTimestampGreaterThanAsc(
DeliveryType.SERVER_SENT_EVENT,
subscriptionId,
offsetTimestamp,
pageable
).stream()
.filter(m -> m.getCoordinates() != null) // we skip messages that refer to -1 partitions and offsets
.toList();

messageStates.addAll(list);
messageStates.addAll(list);

if (!list.isEmpty()) {
currentOffset = list.getLast().getUuid();
}
} else {
currentOffset = null;

var list = messageStateMongoRepo.findByStatusInAndDeliveryTypeAndSubscriptionIdAsc(
List.of(Status.PROCESSED),
DeliveryType.SERVER_SENT_EVENT,
subscriptionId,
pageable
).stream()
.filter(m -> m.getCoordinates() != null) // we skip messages that refer to -1 partitions and offsets
.toList();

messageStates.addAll(list);
}

lastPoll = Instant.now();
}
Expand Down
27 changes: 19 additions & 8 deletions src/main/java/de/telekom/horizon/pulsar/service/SseService.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import de.telekom.horizon.pulsar.config.PulsarConfig;
import de.telekom.horizon.pulsar.exception.SubscriberDoesNotMatchSubscriptionException;
import de.telekom.horizon.pulsar.helper.SseTaskStateContainer;
import de.telekom.horizon.pulsar.helper.StreamLimit;
import lombok.extern.slf4j.Slf4j;
import org.apache.logging.log4j.util.Strings;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -32,7 +33,6 @@ public class SseService {
private final SseTaskFactory sseTaskFactory;
private final SubscriberCache subscriberCache;
private final PulsarConfig pulsarConfig;

private final ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();

/**
Expand Down Expand Up @@ -90,19 +90,30 @@ public void validateSubscriberIdForSubscription(String environment, String subsc
/**
* Starts emitting events for the specified subscription.
*
* @param environment The environment associated with the subscription.
* @param subscriptionId The ID of the subscription for which events should be emitted.
* @param contentType The content type for the events.
* @param includeHttpHeaders A boolean flag indicating whether to include HTTP headers in the emitted events.
* @return The {@link SseTaskStateContainer} representing the state of the emitted events.
* @param environment The environment associated with the subscription.
* @param subscriptionId The ID of the subscription for which events should be emitted.
* @param contentType The content type for the events.
* @param includeHttpHeaders A boolean flag indicating whether to include HTTP headers in the emitted events.
* @param offset Enables offset based streaming. Specifies the offset (message id) of the last received event message.
* @param streamLimit The {@link StreamLimit} represents any customer specific conditions for terminating the stream early.
* @return The {@link SseTaskStateContainer} representing the state of the emitted events.
*/
public SseTaskStateContainer startEmittingEvents(String environment, String subscriptionId, String contentType, boolean includeHttpHeaders) {
public SseTaskStateContainer startEmittingEvents(String environment, String subscriptionId, String contentType, boolean includeHttpHeaders, String offset, StreamLimit streamLimit) {
var responseContainer = new SseTaskStateContainer();

taskExecutor.submit(sseTaskFactory.createNew(environment, subscriptionId, contentType, responseContainer, includeHttpHeaders));
taskExecutor.submit(sseTaskFactory.createNew(environment, subscriptionId, contentType, responseContainer, includeHttpHeaders, offset, streamLimit));

responseContainer.setReady(pulsarConfig.getSseTimeout());

return responseContainer;
}

/**
* Stops emitting events for an existing active stream.
*
* @param subscriptionId The ID of the subscription for which events are being emitted.
*/
public void stopEmittingEvents(String subscriptionId) {
sseTaskFactory.getConnectionCache().removeConnectionForSubscription(subscriptionId);
}
}
Loading

0 comments on commit 7af9d83

Please sign in to comment.