Skip to content

Commit

Permalink
Shutdown async observer thread on destroy.
Browse files Browse the repository at this point in the history
  • Loading branch information
sbernard31 committed Nov 15, 2019
1 parent 1ccd556 commit bf1ae7f
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package org.eclipse.leshan.client.californium.request;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import org.eclipse.californium.core.coap.MessageObserver;
import org.eclipse.californium.core.coap.Request;
Expand All @@ -28,9 +30,12 @@
import org.eclipse.leshan.core.response.ErrorCallback;
import org.eclipse.leshan.core.response.LwM2mResponse;
import org.eclipse.leshan.core.response.ResponseCallback;
import org.eclipse.leshan.util.NamedThreadFactory;

public class CaliforniumLwM2mRequestSender implements LwM2mRequestSender {

private static final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1,
new NamedThreadFactory("Leshan Async Request timeout"));
private final CaliforniumEndpointsManager endpointsManager;

public CaliforniumLwM2mRequestSender(CaliforniumEndpointsManager endpointsManager) {
Expand Down Expand Up @@ -74,7 +79,8 @@ public <T extends LwM2mResponse> void send(InetSocketAddress serverAddress, bool
Request coapRequest = coapClientRequestBuilder.getRequest();

// Add CoAP request callback
MessageObserver obs = new AsyncRequestObserver<T>(coapRequest, responseCallback, errorCallback, timeout) {
MessageObserver obs = new AsyncRequestObserver<T>(coapRequest, responseCallback, errorCallback, timeout,
executor) {
@Override
public T buildResponse(Response coapResponse) {
// Build LwM2m response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*******************************************************************************/
package org.eclipse.leshan.core.californium;

import java.util.concurrent.ScheduledExecutorService;

import org.eclipse.californium.core.coap.Request;
import org.eclipse.californium.core.coap.Response;
import org.eclipse.leshan.core.response.ErrorCallback;
Expand Down Expand Up @@ -44,10 +46,11 @@ public abstract class AsyncRequestObserver<T extends LwM2mResponse> extends Coap
* @param errorCallback This is called when an error happens. This MUST NOT be null.
* @param timeoutInMs A response timeout(in millisecond) which is raised if neither a response or error happens (see
* https://github.com/eclipse/leshan/wiki/Request-Timeout).
* @param executor used to scheduled timeout tasks.
*/
public AsyncRequestObserver(Request coapRequest, final ResponseCallback<T> responseCallback,
final ErrorCallback errorCallback, long timeoutInMs) {
super(coapRequest, null, errorCallback, timeoutInMs);
final ErrorCallback errorCallback, long timeoutInMs, ScheduledExecutorService executor) {
super(coapRequest, null, errorCallback, timeoutInMs, executor);
this.responseCallback = new CoapResponseCallback() {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*******************************************************************************/
package org.eclipse.leshan.core.californium;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -30,7 +29,6 @@
import org.eclipse.leshan.core.request.exception.RequestRejectedException;
import org.eclipse.leshan.core.request.exception.SendFailedException;
import org.eclipse.leshan.core.response.ErrorCallback;
import org.eclipse.leshan.util.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -46,13 +44,13 @@
public class CoapAsyncRequestObserver extends AbstractRequestObserver {

private static final Logger LOG = LoggerFactory.getLogger(CoapAsyncRequestObserver.class);
private static volatile ScheduledExecutorService executor;

protected CoapResponseCallback responseCallback;
private final ErrorCallback errorCallback;
private final long timeoutInMs;
private ScheduledFuture<?> cleaningTask;
private boolean cancelled = false;
private ScheduledExecutorService executor;

// The Californium API does not ensure that message callback are exclusive
// meaning that you can get a onReponse call and a onCancel one.
Expand All @@ -75,13 +73,15 @@ public class CoapAsyncRequestObserver extends AbstractRequestObserver {
* @param errorCallback This is called when an error happens. This MUST NOT be null.
* @param timeoutInMs A response timeout(in millisecond) which is raised if neither a response or error happens (see
* https://github.com/eclipse/leshan/wiki/Request-Timeout).
* @param executor used to scheduled timeout tasks.
*/
public CoapAsyncRequestObserver(Request coapRequest, CoapResponseCallback responseCallback,
ErrorCallback errorCallback, long timeoutInMs) {
ErrorCallback errorCallback, long timeoutInMs, ScheduledExecutorService executor) {
super(coapRequest);
this.responseCallback = responseCallback;
this.errorCallback = errorCallback;
this.timeoutInMs = timeoutInMs;
this.executor = executor;
}

@Override
Expand Down Expand Up @@ -161,7 +161,7 @@ private synchronized void scheduleCleaningTask() {
if (!cancelled)
if (cleaningTask == null) {
LOG.trace("Schedule Cleaning Task for {}", coapRequest);
cleaningTask = getExecutor().schedule(new Runnable() {
cleaningTask = executor.schedule(new Runnable() {
@Override
public void run() {
responseTimedOut.set(true);
Expand All @@ -177,16 +177,4 @@ private synchronized void cancelCleaningTask() {
}
cancelled = true;
}

private ScheduledExecutorService getExecutor() {
if (executor == null) {
synchronized (this.getClass()) {
if (executor == null) {
executor = Executors.newScheduledThreadPool(1,
new NamedThreadFactory("Leshan Async Request timeout"));
}
}
}
return executor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ public void start() {
if (securityStore instanceof Startable) {
((Startable) securityStore).start();
}
if (requestSender instanceof Startable) {
((Startable) requestSender).start();
}

// Start server
coapServer.start();
Expand All @@ -299,6 +302,9 @@ public void stop() {
if (securityStore instanceof Stoppable) {
((Stoppable) securityStore).stop();
}
if (requestSender instanceof Stoppable) {
((Stoppable) requestSender).stop();
}

LOG.info("LWM2M server stopped.");
}
Expand All @@ -325,6 +331,11 @@ public void destroy() {
((Stoppable) securityStore).stop();
}

if (requestSender instanceof Destroyable) {
((Destroyable) requestSender).destroy();
} else if (requestSender instanceof Stoppable) {
((Stoppable) requestSender).stop();
}
LOG.info("LWM2M server destroyed.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.eclipse.leshan.core.response.ErrorCallback;
import org.eclipse.leshan.core.response.LwM2mResponse;
import org.eclipse.leshan.core.response.ResponseCallback;
import org.eclipse.leshan.server.Destroyable;
import org.eclipse.leshan.server.bootstrap.BootstrapSession;
import org.eclipse.leshan.server.bootstrap.LwM2mBootstrapRequestSender;
import org.eclipse.leshan.server.californium.request.RequestSender;
Expand All @@ -37,7 +38,7 @@
/**
* An implementation of {@link LwM2mBootstrapRequestSender} based on Californium.
*/
public class CaliforniumLwM2mBootstrapRequestSender implements LwM2mBootstrapRequestSender {
public class CaliforniumLwM2mBootstrapRequestSender implements LwM2mBootstrapRequestSender, Destroyable {

static final Logger LOG = LoggerFactory.getLogger(CaliforniumLwM2mBootstrapRequestSender.class);

Expand Down Expand Up @@ -126,4 +127,9 @@ public <T extends LwM2mResponse> void send(BootstrapSession destination, Downlin
public void cancelOngoingRequests(BootstrapSession destination) {
sender.cancelRequests(destination.getId());
}

@Override
public void destroy() {
sender.destroy();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import org.eclipse.leshan.core.model.LwM2mModel;
import org.eclipse.leshan.core.node.codec.LwM2mNodeDecoder;
import org.eclipse.leshan.core.node.codec.LwM2mNodeEncoder;
import org.eclipse.leshan.server.Destroyable;
import org.eclipse.leshan.server.Startable;
import org.eclipse.leshan.server.Stoppable;
import org.eclipse.leshan.server.bootstrap.BootstrapConfigStore;
import org.eclipse.leshan.server.bootstrap.BootstrapHandler;
import org.eclipse.leshan.server.bootstrap.BootstrapHandlerFactory;
Expand Down Expand Up @@ -54,6 +57,8 @@ public class LeshanBootstrapServer {
private final BootstrapConfigStore bsStore;
private final BootstrapSecurityStore bsSecurityStore;

private LwM2mBootstrapRequestSender requestSender;

/**
* /** Initialize a server which will bind to the specified address and port.
* <p>
Expand Down Expand Up @@ -97,8 +102,7 @@ public LeshanBootstrapServer(CoapEndpoint unsecuredEndpoint, CoapEndpoint secure
coapServer.addEndpoint(securedEndpoint);

// create request sender
LwM2mBootstrapRequestSender requestSender = createRequestSender(securedEndpoint, unsecuredEndpoint, model,
encoder, decoder);
requestSender = createRequestSender(securedEndpoint, unsecuredEndpoint, model, encoder, decoder);

// create bootstrap resource
CoapResource bsResource = createBootstrapResource(
Expand Down Expand Up @@ -147,6 +151,9 @@ public BootstrapConfigStore getBoostrapStore() {
* Starts the server and binds it to the specified port.
*/
public void start() {
if (requestSender instanceof Startable) {
((Startable) requestSender).start();
}
coapServer.start();

if (LOG.isInfoEnabled()) {
Expand All @@ -161,6 +168,9 @@ public void start() {
*/
public void stop() {
coapServer.stop();
if (requestSender instanceof Stoppable) {
((Stoppable) requestSender).stop();
}
LOG.info("Bootstrap server stopped.");
}

Expand All @@ -171,6 +181,12 @@ public void stop() {
*/
public void destroy() {
coapServer.destroy();

if (requestSender instanceof Destroyable) {
((Destroyable) requestSender).destroy();
} else if (requestSender instanceof Stoppable) {
((Stoppable) requestSender).stop();
}
LOG.info("Bootstrap server destroyed.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.eclipse.leshan.core.response.LwM2mResponse;
import org.eclipse.leshan.core.response.ObserveResponse;
import org.eclipse.leshan.core.response.ResponseCallback;
import org.eclipse.leshan.server.Destroyable;
import org.eclipse.leshan.server.californium.observation.ObservationServiceImpl;
import org.eclipse.leshan.server.model.LwM2mModelProvider;
import org.eclipse.leshan.server.registration.Registration;
Expand All @@ -42,7 +43,7 @@
/**
* An implementation of {@link LwM2mRequestSender} and {@link CoapRequestSender} based on Californium.
*/
public class CaliforniumLwM2mRequestSender implements LwM2mRequestSender, CoapRequestSender {
public class CaliforniumLwM2mRequestSender implements LwM2mRequestSender, CoapRequestSender, Destroyable {

private final ObservationServiceImpl observationService;
private final LwM2mModelProvider modelProvider;
Expand Down Expand Up @@ -218,4 +219,9 @@ public void cancelOngoingRequests(Registration registration) {
Validate.notNull(registration);
sender.cancelRequests(registration.getId());
}

@Override
public void destroy() {
sender.destroy();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.eclipse.leshan.core.request.exception.ClientSleepingException;
import org.eclipse.leshan.core.request.exception.TimeoutException;
import org.eclipse.leshan.core.response.ErrorCallback;
import org.eclipse.leshan.server.Destroyable;
import org.eclipse.leshan.server.queue.Presence;
import org.eclipse.leshan.server.queue.PresenceServiceImpl;
import org.eclipse.leshan.server.queue.QueueModeLwM2mRequestSender;
Expand All @@ -30,7 +31,8 @@
/**
* A {@link LwM2mRequestSender} and {@link CoapRequestSender} which supports LWM2M Queue Mode.
*/
public class CaliforniumQueueModeRequestSender extends QueueModeLwM2mRequestSender implements CoapRequestSender {
public class CaliforniumQueueModeRequestSender extends QueueModeLwM2mRequestSender
implements CoapRequestSender, Destroyable {

/**
* @param presenceService the presence service object for setting the client into {@link Presence#SLEEPING} when
Expand Down Expand Up @@ -128,4 +130,11 @@ public void onError(Exception e) {
}
});
}

@Override
public void destroy() {
if (delegatedSender instanceof Destroyable) {
((Destroyable) delegatedSender).destroy();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import java.util.SortedMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.eclipse.californium.core.coap.MessageObserver;
Expand Down Expand Up @@ -45,7 +48,9 @@
import org.eclipse.leshan.core.response.ErrorCallback;
import org.eclipse.leshan.core.response.LwM2mResponse;
import org.eclipse.leshan.core.response.ResponseCallback;
import org.eclipse.leshan.server.Destroyable;
import org.eclipse.leshan.server.californium.bootstrap.CaliforniumLwM2mBootstrapRequestSender;
import org.eclipse.leshan.util.NamedThreadFactory;
import org.eclipse.leshan.util.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -55,9 +60,13 @@
* <p>
* It can also link requests to a kind of "session" and cancel all ongoing requests associated to a given "session".
*/
public class RequestSender {
public class RequestSender implements Destroyable {

static final Logger LOG = LoggerFactory.getLogger(CaliforniumLwM2mBootstrapRequestSender.class);

private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1,
new NamedThreadFactory("Leshan Async Request timeout"));

private final Endpoint nonSecureEndpoint;
private final Endpoint secureEndpoint;
private final LwM2mNodeDecoder decoder;
Expand Down Expand Up @@ -190,7 +199,8 @@ public <T extends LwM2mResponse> void sendLwm2mRequest(final String endpointName
final Request coapRequest = coapClientRequestBuilder.getRequest();

// Add CoAP request callback
MessageObserver obs = new AsyncRequestObserver<T>(coapRequest, responseCallback, errorCallback, timeoutInMs) {
MessageObserver obs = new AsyncRequestObserver<T>(coapRequest, responseCallback, errorCallback, timeoutInMs,
executor) {
@Override
public T buildResponse(Response coapResponse) {
// Build LwM2m response
Expand Down Expand Up @@ -294,7 +304,8 @@ public void sendCoapRequest(Identity destination, String sessionId, Request coap
coapRequest.setDestinationContext(context);

// Add CoAP request callback
MessageObserver obs = new CoapAsyncRequestObserver(coapRequest, responseCallback, errorCallback, timeoutInMs);
MessageObserver obs = new CoapAsyncRequestObserver(coapRequest, responseCallback, errorCallback, timeoutInMs,
executor);
coapRequest.addMessageObserver(obs);

// Store pending request to be able to cancel it later
Expand Down Expand Up @@ -390,4 +401,14 @@ public void onCancel() {
removeOngoingRequest(requestKey, coapRequest);
}
}

@Override
public void destroy() {
executor.shutdownNow();
try {
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.warn("Destroying RequestSender was interrupted.", e);
}
}
}

0 comments on commit bf1ae7f

Please sign in to comment.