Skip to content

Commit

Permalink
Better use of CompletableFuture in JavaCoapServerEndpoint.
Browse files Browse the repository at this point in the history
  • Loading branch information
sbernard31 committed Jan 20, 2023
1 parent 4c1913b commit 475baf7
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.eclipse.leshan.core.endpoint.Protocol;
import org.eclipse.leshan.core.observation.Observation;
Expand All @@ -41,7 +44,6 @@
import com.mbed.coap.client.CoapClient;
import com.mbed.coap.client.CoapClientBuilder;
import com.mbed.coap.client.ObservationConsumer;
import com.mbed.coap.exception.CoapException;
import com.mbed.coap.packet.CoapRequest;
import com.mbed.coap.packet.CoapResponse;
import com.mbed.coap.packet.Opaque;
Expand Down Expand Up @@ -82,127 +84,141 @@ public URI getURI() {
public <T extends LwM2mResponse> T send(ClientProfile destination, DownlinkRequest<T> request,
LowerLayerConfig lowerLayerConfig, long timeoutInMs) throws InterruptedException {

// Create Coap Request to send
CoapRequest coapRequest = translator.createCoapRequest(destination, request, toolbox);
// Send LWM2M Request
CompletableFuture<T> lwm2mResponseFuture = sendLwM2mRequest(destination, request, lowerLayerConfig);

// Create a Coap Client to send request
CoapClient coapClient = CoapClientBuilder.clientFor(destination.getIdentity().getPeerAddress(), coapServer);

// Send CoAP request synchronously
// Wait synchronously for LWM2M response
try {
// Handle special case of ObserveRequest
if (request instanceof ObserveRequest) {
// TODO HACK as we can not get token from coapresponse.
Opaque token = translator.getTokenGenerator().createToken();
final CoapRequest newCoapRequest = coapRequest.token(token);

// Add Callback to Handle notification
// TODO HACK we don't use sendSync because of observe Handling
CompletableFuture<CoapResponse> differedCoapResponse = coapClient.send(newCoapRequest);
differedCoapResponse.thenAccept(r -> ObservationConsumer.consumeFrom(r.next, notification -> {
// Handle notification
ObserveResponse lwm2mResponse = null;
try {
// create LWM2M response
lwm2mResponse = (ObserveResponse) translator.createLwM2mResponse(destination, request,
coapRequest, notification, toolbox, token);
SingleObservation observation = lwm2mResponse.getObservation();

// check if we have observe relation in store for this notification
Observation observeRelation = registrationStore.getObservation(destination.getRegistrationId(),
observation.getId());
if (observeRelation != null) {
// we have an observe relation notify upper layer
notificationReceiver.onNotification(lwm2mResponse.getObservation(), destination,
lwm2mResponse);
return true;
} else {
// we haven't observe relation so stop this observation.
return false;
}
} catch (Exception e) {
if (lwm2mResponse != null) {
notificationReceiver.onError(lwm2mResponse.getObservation(), destination, e);
} else {
notificationReceiver
.onError(ObservationUtil.createSingleObservation(destination.getRegistrationId(),
(ObserveRequest) request, token, null), destination, e);
}
return false;
}

}));

// wait synchronously for CoAP response;
CoapResponse coapResponse = await(differedCoapResponse);

// translate CoAP response into LWM2M response
T lwm2mResponse = translator.createLwM2mResponse(destination, request, coapRequest, coapResponse,
toolbox, token);

// Add Observation to the store if relation is established
if (lwm2mResponse.isSuccess()) {
ObserveResponse observeResponse = (ObserveResponse) lwm2mResponse;
// TODO should we handle conflict ?
Collection<Observation> previousRelation = registrationStore
.addObservation(destination.getRegistrationId(), observeResponse.getObservation(), false);
if (!previousRelation.isEmpty()) {
// TODO log that a relation is override.
}

// notify upper layer that new relation is established
notificationReceiver.newObservation(observeResponse.getObservation(),
destination.getRegistration());
}

return lwm2mResponse;
} else {
// Common use case : Send CoAP Request
CoapResponse coapResponse = coapClient.sendSync(coapRequest);
// translate CoAP response into LWM2M response
return translator.createLwM2mResponse(destination, request, coapRequest, coapResponse, toolbox, null);
}
} catch (CoapException e) {
throw new IllegalStateException("Unable to send request");
return lwm2mResponseFuture.get(timeoutInMs, TimeUnit.MILLISECONDS);
} catch (CompletionException | ExecutionException exception) {
// TODO we probably need to enhance this (better translate java-coap exceptions to leshan ones)
throw new SendFailedException("Unable to send request", exception);
} catch (TimeoutException e) {
lwm2mResponseFuture.cancel(true);
return null;
}
}

@Override
public <T extends LwM2mResponse> void send(ClientProfile destination, DownlinkRequest<T> request,
ResponseCallback<T> responseCallback, ErrorCallback errorCallback, LowerLayerConfig lowerLayerConfig,
long timeoutInMs) {
CoapRequest coapRequest = translator.createCoapRequest(destination, request, toolbox);

// create a Coap Client to send request
CoapClient coapClient = CoapClientBuilder.clientFor(destination.getIdentity().getPeerAddress(), coapServer);
// Send LWM2M Request
CompletableFuture<T> lwm2mResponseFuture = sendLwM2mRequest(destination, request, lowerLayerConfig);

// Send CoAP request asynchronously
coapClient.send(coapRequest)
// Attach callback
lwm2mResponseFuture
// Handle Exception
.exceptionally((exception) -> {
errorCallback.onError(new SendFailedException(exception));
// TODO we probably need to enhance this (better translate java-coap exceptions to leshan ones)
errorCallback.onError(new SendFailedException("Unable to send request", exception));
return null;
})
// Handle CoAP Response
.thenAccept((coapResponse) -> {
T lwM2mResponse = translator.createLwM2mResponse(destination, request, coapRequest, coapResponse,
toolbox, null);
.thenAccept((lwM2mResponse) -> {
responseCallback.onResponse(lwM2mResponse);
});

// TODO handle timeout (cancel future)
}

// TODO this is a copy/past from com.mbed.coap.client.CoapClient.await(CompletableFuture<CoapResponse>) we should
// find a better way.
private static CoapResponse await(CompletableFuture<CoapResponse> future) throws CoapException {
try {
return future.join();
} catch (CompletionException ex) {
if (ex.getCause() instanceof CoapException) {
throw (CoapException) ex.getCause();
} else {
throw new CoapException(ex.getCause());
}
protected <T extends LwM2mResponse> CompletableFuture<T> sendLwM2mRequest(ClientProfile destination,
DownlinkRequest<T> lwm2mRequest, LowerLayerConfig lowerLayerConfig) {

if (lwm2mRequest instanceof ObserveRequest) {
return sendObserveRequest(destination, lwm2mRequest, lowerLayerConfig);
} else {
// Create Coap Request to send from LWM2M Request
CoapRequest coapRequest = translator.createCoapRequest(destination, lwm2mRequest, toolbox);

// Create a Coap Client to send request
CoapClient coapClient = CoapClientBuilder.clientFor(destination.getIdentity().getPeerAddress(), coapServer);

// Send CoAP Request
CompletableFuture<CoapResponse> coapResponseFuture = coapClient.send(coapRequest);

// On response, create LWM2M Response from CoAP response
CompletableFuture<T> lwm2mResponseFuture = coapResponseFuture.thenApply(coapResponse -> translator
.createLwM2mResponse(destination, lwm2mRequest, coapResponse, toolbox, null));

return lwm2mResponseFuture;
}

}

protected <T extends LwM2mResponse> CompletableFuture<T> sendObserveRequest(ClientProfile destination,
DownlinkRequest<T> lwm2mRequest, LowerLayerConfig lowerLayerConfig) {
// Create Coap Request to send from LWM2M Request
CoapRequest coapRequest = translator.createCoapRequest(destination, lwm2mRequest, toolbox);

// TODO HACK as we can not get token from coap response.
Opaque token = translator.getTokenGenerator().createToken();
final CoapRequest hackedCoapRequest = coapRequest.token(token);

// Create a Coap Client to send request
CoapClient coapClient = CoapClientBuilder.clientFor(destination.getIdentity().getPeerAddress(), coapServer);

// Send CoAP Request
CompletableFuture<CoapResponse> coapResponseFuture = coapClient.send(hackedCoapRequest);

// Handle Notifications
// --------------------
coapResponseFuture.thenAccept(r -> ObservationConsumer.consumeFrom(r.next, notification -> {
ObserveResponse lwm2mResponse = null;
try {
// Create LWM2M response
lwm2mResponse = (ObserveResponse) translator.createLwM2mResponse(destination, lwm2mRequest,
notification, toolbox, token);
SingleObservation observation = lwm2mResponse.getObservation();

// Check if we have observe relation in store for this notification
Observation observeRelation = registrationStore.getObservation(destination.getRegistrationId(),
observation.getId());
if (observeRelation != null) {
// We have an observe relation notify upper layer
notificationReceiver.onNotification(lwm2mResponse.getObservation(), destination, lwm2mResponse);
return true;
} else {
// We haven't observe relation so stop this observation.
return false;
}
} catch (Exception e) {
if (lwm2mResponse != null) {
notificationReceiver.onError(lwm2mResponse.getObservation(), destination, e);
} else {
notificationReceiver
.onError(ObservationUtil.createSingleObservation(destination.getRegistrationId(),
(ObserveRequest) lwm2mRequest, token, null), destination, e);
}
return false;
}

}));

// On response, create LWM2M Response from CoAP response
CompletableFuture<T> lwm2mResponseFuture = coapResponseFuture.thenApply(coapResponse -> translator
.createLwM2mResponse(destination, lwm2mRequest, coapResponse, toolbox, token));

// Handle Observation Relation
// ----------------------------
// Add Observation to the store if relation is established
lwm2mResponseFuture.thenAccept(lwm2mResponse -> {
if (lwm2mResponse.isSuccess()) {
ObserveResponse observeResponse = (ObserveResponse) lwm2mResponse;
// TODO should we handle conflict ?
Collection<Observation> previousRelation = registrationStore
.addObservation(destination.getRegistrationId(), observeResponse.getObservation(), false);
if (!previousRelation.isEmpty()) {
// TODO log that a relation is override.
}
// notify upper layer that new relation is established
notificationReceiver.newObservation(observeResponse.getObservation(), destination.getRegistration());
}
});

return lwm2mResponseFuture;

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,10 @@ public CoapRequest createCoapRequest(ClientProfile clientProfile,
}

public <T extends LwM2mResponse> T createLwM2mResponse(ClientProfile clientProfile, DownlinkRequest<T> lwm2mRequest,
CoapRequest coapRequest, CoapResponse coapResponse, ServerEndpointToolbox toolbox,
/* TODO HACK */ Opaque token) {
CoapResponse coapResponse, ServerEndpointToolbox toolbox, /* TODO HACK */ Opaque token) {

LwM2mResponseBuilder<T> builder = new LwM2mResponseBuilder<T>(coapRequest, coapResponse,
clientProfile.getEndpoint(), clientProfile.getModel(), toolbox.getDecoder(), toolbox.getLinkParser(),
LwM2mResponseBuilder<T> builder = new LwM2mResponseBuilder<T>(coapResponse, clientProfile.getEndpoint(),
clientProfile.getModel(), toolbox.getDecoder(), toolbox.getLinkParser(),
clientProfile.getRegistrationId(), token);
lwm2mRequest.accept(builder);
return builder.getResponse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mbed.coap.packet.CoapRequest;
import com.mbed.coap.packet.CoapResponse;
import com.mbed.coap.packet.Code;
import com.mbed.coap.packet.MediaTypes;
Expand All @@ -97,26 +96,28 @@ public class LwM2mResponseBuilder<T extends LwM2mResponse> implements DownlinkRe
private static final Logger LOG = LoggerFactory.getLogger(LwM2mResponseBuilder.class);

private LwM2mResponse lwM2mresponse;
// private final CoapRequest coapRequest;
private final String registrationId;

private final Opaque token; // TODO HACK because know token used

private final CoapResponse coapResponse;
private final String clientEndpoint;
private final String registrationId;
private final LwM2mModel model;
private final LwM2mDecoder decoder;
private final LwM2mLinkParser linkParser;
private final Opaque token;

public LwM2mResponseBuilder(CoapRequest coapRequest, CoapResponse coapResponse, String clientEndpoint,
LwM2mModel model, LwM2mDecoder decoder, LwM2mLinkParser linkParser, String registrationId,
/* TODO HACK */ Opaque token) {
// this.coapRequest = coapRequest;
public LwM2mResponseBuilder(CoapResponse coapResponse, String clientEndpoint, LwM2mModel model,
LwM2mDecoder decoder, LwM2mLinkParser linkParser, String registrationId, /* TODO HACK */ Opaque token) {
this.coapResponse = coapResponse;

this.clientEndpoint = clientEndpoint;
this.registrationId = registrationId;

this.model = model;
this.decoder = decoder;
this.linkParser = linkParser;

this.token = token;
this.registrationId = registrationId;
}

@Override
Expand Down

0 comments on commit 475baf7

Please sign in to comment.