Skip to content

Commit

Permalink
Add live packet to SSE item state connections (#3086)
Browse files Browse the repository at this point in the history
Signed-off-by: Dan Cunningham <dan@digitaldan.com>
  • Loading branch information
digitaldan authored Sep 26, 2022
1 parent 065e33f commit b808ea6
Showing 1 changed file with 13 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,13 @@ public class SseResource implements RESTResource, SsePublisher {

private static final String X_ACCEL_BUFFERING_HEADER = "X-Accel-Buffering";

public static final int ALIVE_INTERVAL_SECONDS = 10;

private final Logger logger = LoggerFactory.getLogger(SseResource.class);

private final ScheduledExecutorService scheduler = ThreadPoolManager
.getScheduledPool(ThreadPoolManager.THREAD_POOL_NAME_COMMON);
private final ScheduledFuture<?> cleanSubscriptionsJob;
private final ScheduledFuture<?> aliveEventJob;

private @Context @NonNullByDefault({}) Sse sse;

Expand All @@ -120,20 +122,21 @@ public SseResource(@Reference SseItemStatesEventBuilder itemStatesEventBuilder)
this.executorService = Executors.newSingleThreadExecutor();
this.itemStatesEventBuilder = itemStatesEventBuilder;

cleanSubscriptionsJob = scheduler.scheduleWithFixedDelay(() -> {
logger.debug("Run clean SSE subscriptions job");
OutboundSseEvent outboundSseEvent = sse.newEventBuilder().name("event")
.mediaType(MediaType.APPLICATION_JSON_TYPE).data(new ServerAliveEvent()).build();
topicBroadcaster.send(outboundSseEvent);
}, 1, 2, TimeUnit.MINUTES);
aliveEventJob = scheduler.scheduleWithFixedDelay(() -> {
logger.debug("Sending alive event to SSE connections");
OutboundSseEvent aliveEvent = sse.newEventBuilder().name("alive").mediaType(MediaType.APPLICATION_JSON_TYPE)
.data(new AliveEvent()).build();
itemStatesBroadcaster.send(aliveEvent);
topicBroadcaster.send(aliveEvent);
}, 1, ALIVE_INTERVAL_SECONDS, TimeUnit.SECONDS);
}

@Deactivate
public void deactivate() {
itemStatesBroadcaster.close();
topicBroadcaster.close();
executorService.shutdown();
cleanSubscriptionsJob.cancel(true);
aliveEventJob.cancel(true);
}

@Override
Expand Down Expand Up @@ -262,7 +265,8 @@ public void handleEventBroadcastItemState(final ItemStateChangedEvent stateChang
}
}

private static class ServerAliveEvent {
private static class AliveEvent {
public final String type = "ALIVE";
public final int interval = ALIVE_INTERVAL_SECONDS;
}
}

0 comments on commit b808ea6

Please sign in to comment.