From 475baf7d97f4e737321de739e3b81131b6134a3c Mon Sep 17 00:00:00 2001 From: Simon Bernard Date: Fri, 20 Jan 2023 17:55:58 +0100 Subject: [PATCH] Better use of CompletableFuture in JavaCoapServerEndpoint. --- .../endpoint/JavaCoapServerEndpoint.java | 218 ++++++++++-------- .../endpoint/ServerCoapMessageTranslator.java | 7 +- .../request/LwM2mResponseBuilder.java | 19 +- 3 files changed, 130 insertions(+), 114 deletions(-) diff --git a/leshan-tl-javacoap-server/src/main/java/org/eclipse/leshan/transport/javacoap/endpoint/JavaCoapServerEndpoint.java b/leshan-tl-javacoap-server/src/main/java/org/eclipse/leshan/transport/javacoap/endpoint/JavaCoapServerEndpoint.java index d43fe2186b..52947fc1a6 100644 --- a/leshan-tl-javacoap-server/src/main/java/org/eclipse/leshan/transport/javacoap/endpoint/JavaCoapServerEndpoint.java +++ b/leshan-tl-javacoap-server/src/main/java/org/eclipse/leshan/transport/javacoap/endpoint/JavaCoapServerEndpoint.java @@ -19,6 +19,9 @@ import java.util.Collection; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.eclipse.leshan.core.endpoint.Protocol; import org.eclipse.leshan.core.observation.Observation; @@ -41,7 +44,6 @@ import com.mbed.coap.client.CoapClient; import com.mbed.coap.client.CoapClientBuilder; import com.mbed.coap.client.ObservationConsumer; -import com.mbed.coap.exception.CoapException; import com.mbed.coap.packet.CoapRequest; import com.mbed.coap.packet.CoapResponse; import com.mbed.coap.packet.Opaque; @@ -82,88 +84,18 @@ public URI getURI() { public T send(ClientProfile destination, DownlinkRequest request, LowerLayerConfig lowerLayerConfig, long timeoutInMs) throws InterruptedException { - // Create Coap Request to send - CoapRequest coapRequest = translator.createCoapRequest(destination, request, toolbox); + // Send LWM2M Request + CompletableFuture lwm2mResponseFuture = sendLwM2mRequest(destination, request, lowerLayerConfig); - // Create a Coap Client to send request - CoapClient coapClient = CoapClientBuilder.clientFor(destination.getIdentity().getPeerAddress(), coapServer); - - // Send CoAP request synchronously + // Wait synchronously for LWM2M response try { - // Handle special case of ObserveRequest - if (request instanceof ObserveRequest) { - // TODO HACK as we can not get token from coapresponse. - Opaque token = translator.getTokenGenerator().createToken(); - final CoapRequest newCoapRequest = coapRequest.token(token); - - // Add Callback to Handle notification - // TODO HACK we don't use sendSync because of observe Handling - CompletableFuture differedCoapResponse = coapClient.send(newCoapRequest); - differedCoapResponse.thenAccept(r -> ObservationConsumer.consumeFrom(r.next, notification -> { - // Handle notification - ObserveResponse lwm2mResponse = null; - try { - // create LWM2M response - lwm2mResponse = (ObserveResponse) translator.createLwM2mResponse(destination, request, - coapRequest, notification, toolbox, token); - SingleObservation observation = lwm2mResponse.getObservation(); - - // check if we have observe relation in store for this notification - Observation observeRelation = registrationStore.getObservation(destination.getRegistrationId(), - observation.getId()); - if (observeRelation != null) { - // we have an observe relation notify upper layer - notificationReceiver.onNotification(lwm2mResponse.getObservation(), destination, - lwm2mResponse); - return true; - } else { - // we haven't observe relation so stop this observation. - return false; - } - } catch (Exception e) { - if (lwm2mResponse != null) { - notificationReceiver.onError(lwm2mResponse.getObservation(), destination, e); - } else { - notificationReceiver - .onError(ObservationUtil.createSingleObservation(destination.getRegistrationId(), - (ObserveRequest) request, token, null), destination, e); - } - return false; - } - - })); - - // wait synchronously for CoAP response; - CoapResponse coapResponse = await(differedCoapResponse); - - // translate CoAP response into LWM2M response - T lwm2mResponse = translator.createLwM2mResponse(destination, request, coapRequest, coapResponse, - toolbox, token); - - // Add Observation to the store if relation is established - if (lwm2mResponse.isSuccess()) { - ObserveResponse observeResponse = (ObserveResponse) lwm2mResponse; - // TODO should we handle conflict ? - Collection previousRelation = registrationStore - .addObservation(destination.getRegistrationId(), observeResponse.getObservation(), false); - if (!previousRelation.isEmpty()) { - // TODO log that a relation is override. - } - - // notify upper layer that new relation is established - notificationReceiver.newObservation(observeResponse.getObservation(), - destination.getRegistration()); - } - - return lwm2mResponse; - } else { - // Common use case : Send CoAP Request - CoapResponse coapResponse = coapClient.sendSync(coapRequest); - // translate CoAP response into LWM2M response - return translator.createLwM2mResponse(destination, request, coapRequest, coapResponse, toolbox, null); - } - } catch (CoapException e) { - throw new IllegalStateException("Unable to send request"); + return lwm2mResponseFuture.get(timeoutInMs, TimeUnit.MILLISECONDS); + } catch (CompletionException | ExecutionException exception) { + // TODO we probably need to enhance this (better translate java-coap exceptions to leshan ones) + throw new SendFailedException("Unable to send request", exception); + } catch (TimeoutException e) { + lwm2mResponseFuture.cancel(true); + return null; } } @@ -171,38 +103,122 @@ public T send(ClientProfile destination, DownlinkReque public void send(ClientProfile destination, DownlinkRequest request, ResponseCallback responseCallback, ErrorCallback errorCallback, LowerLayerConfig lowerLayerConfig, long timeoutInMs) { - CoapRequest coapRequest = translator.createCoapRequest(destination, request, toolbox); - // create a Coap Client to send request - CoapClient coapClient = CoapClientBuilder.clientFor(destination.getIdentity().getPeerAddress(), coapServer); + // Send LWM2M Request + CompletableFuture lwm2mResponseFuture = sendLwM2mRequest(destination, request, lowerLayerConfig); - // Send CoAP request asynchronously - coapClient.send(coapRequest) + // Attach callback + lwm2mResponseFuture // Handle Exception .exceptionally((exception) -> { - errorCallback.onError(new SendFailedException(exception)); + // TODO we probably need to enhance this (better translate java-coap exceptions to leshan ones) + errorCallback.onError(new SendFailedException("Unable to send request", exception)); return null; }) // Handle CoAP Response - .thenAccept((coapResponse) -> { - T lwM2mResponse = translator.createLwM2mResponse(destination, request, coapRequest, coapResponse, - toolbox, null); + .thenAccept((lwM2mResponse) -> { responseCallback.onResponse(lwM2mResponse); }); + + // TODO handle timeout (cancel future) } - // TODO this is a copy/past from com.mbed.coap.client.CoapClient.await(CompletableFuture) we should - // find a better way. - private static CoapResponse await(CompletableFuture future) throws CoapException { - try { - return future.join(); - } catch (CompletionException ex) { - if (ex.getCause() instanceof CoapException) { - throw (CoapException) ex.getCause(); - } else { - throw new CoapException(ex.getCause()); - } + protected CompletableFuture sendLwM2mRequest(ClientProfile destination, + DownlinkRequest lwm2mRequest, LowerLayerConfig lowerLayerConfig) { + + if (lwm2mRequest instanceof ObserveRequest) { + return sendObserveRequest(destination, lwm2mRequest, lowerLayerConfig); + } else { + // Create Coap Request to send from LWM2M Request + CoapRequest coapRequest = translator.createCoapRequest(destination, lwm2mRequest, toolbox); + + // Create a Coap Client to send request + CoapClient coapClient = CoapClientBuilder.clientFor(destination.getIdentity().getPeerAddress(), coapServer); + + // Send CoAP Request + CompletableFuture coapResponseFuture = coapClient.send(coapRequest); + + // On response, create LWM2M Response from CoAP response + CompletableFuture lwm2mResponseFuture = coapResponseFuture.thenApply(coapResponse -> translator + .createLwM2mResponse(destination, lwm2mRequest, coapResponse, toolbox, null)); + + return lwm2mResponseFuture; } + + } + + protected CompletableFuture sendObserveRequest(ClientProfile destination, + DownlinkRequest lwm2mRequest, LowerLayerConfig lowerLayerConfig) { + // Create Coap Request to send from LWM2M Request + CoapRequest coapRequest = translator.createCoapRequest(destination, lwm2mRequest, toolbox); + + // TODO HACK as we can not get token from coap response. + Opaque token = translator.getTokenGenerator().createToken(); + final CoapRequest hackedCoapRequest = coapRequest.token(token); + + // Create a Coap Client to send request + CoapClient coapClient = CoapClientBuilder.clientFor(destination.getIdentity().getPeerAddress(), coapServer); + + // Send CoAP Request + CompletableFuture coapResponseFuture = coapClient.send(hackedCoapRequest); + + // Handle Notifications + // -------------------- + coapResponseFuture.thenAccept(r -> ObservationConsumer.consumeFrom(r.next, notification -> { + ObserveResponse lwm2mResponse = null; + try { + // Create LWM2M response + lwm2mResponse = (ObserveResponse) translator.createLwM2mResponse(destination, lwm2mRequest, + notification, toolbox, token); + SingleObservation observation = lwm2mResponse.getObservation(); + + // Check if we have observe relation in store for this notification + Observation observeRelation = registrationStore.getObservation(destination.getRegistrationId(), + observation.getId()); + if (observeRelation != null) { + // We have an observe relation notify upper layer + notificationReceiver.onNotification(lwm2mResponse.getObservation(), destination, lwm2mResponse); + return true; + } else { + // We haven't observe relation so stop this observation. + return false; + } + } catch (Exception e) { + if (lwm2mResponse != null) { + notificationReceiver.onError(lwm2mResponse.getObservation(), destination, e); + } else { + notificationReceiver + .onError(ObservationUtil.createSingleObservation(destination.getRegistrationId(), + (ObserveRequest) lwm2mRequest, token, null), destination, e); + } + return false; + } + + })); + + // On response, create LWM2M Response from CoAP response + CompletableFuture lwm2mResponseFuture = coapResponseFuture.thenApply(coapResponse -> translator + .createLwM2mResponse(destination, lwm2mRequest, coapResponse, toolbox, token)); + + // Handle Observation Relation + // ---------------------------- + // Add Observation to the store if relation is established + lwm2mResponseFuture.thenAccept(lwm2mResponse -> { + if (lwm2mResponse.isSuccess()) { + ObserveResponse observeResponse = (ObserveResponse) lwm2mResponse; + // TODO should we handle conflict ? + Collection previousRelation = registrationStore + .addObservation(destination.getRegistrationId(), observeResponse.getObservation(), false); + if (!previousRelation.isEmpty()) { + // TODO log that a relation is override. + } + // notify upper layer that new relation is established + notificationReceiver.newObservation(observeResponse.getObservation(), destination.getRegistration()); + } + }); + + return lwm2mResponseFuture; + } @Override diff --git a/leshan-tl-javacoap-server/src/main/java/org/eclipse/leshan/transport/javacoap/endpoint/ServerCoapMessageTranslator.java b/leshan-tl-javacoap-server/src/main/java/org/eclipse/leshan/transport/javacoap/endpoint/ServerCoapMessageTranslator.java index 4124079791..3ba858b029 100644 --- a/leshan-tl-javacoap-server/src/main/java/org/eclipse/leshan/transport/javacoap/endpoint/ServerCoapMessageTranslator.java +++ b/leshan-tl-javacoap-server/src/main/java/org/eclipse/leshan/transport/javacoap/endpoint/ServerCoapMessageTranslator.java @@ -50,11 +50,10 @@ public CoapRequest createCoapRequest(ClientProfile clientProfile, } public T createLwM2mResponse(ClientProfile clientProfile, DownlinkRequest lwm2mRequest, - CoapRequest coapRequest, CoapResponse coapResponse, ServerEndpointToolbox toolbox, - /* TODO HACK */ Opaque token) { + CoapResponse coapResponse, ServerEndpointToolbox toolbox, /* TODO HACK */ Opaque token) { - LwM2mResponseBuilder builder = new LwM2mResponseBuilder(coapRequest, coapResponse, - clientProfile.getEndpoint(), clientProfile.getModel(), toolbox.getDecoder(), toolbox.getLinkParser(), + LwM2mResponseBuilder builder = new LwM2mResponseBuilder(coapResponse, clientProfile.getEndpoint(), + clientProfile.getModel(), toolbox.getDecoder(), toolbox.getLinkParser(), clientProfile.getRegistrationId(), token); lwm2mRequest.accept(builder); return builder.getResponse(); diff --git a/leshan-tl-javacoap-server/src/main/java/org/eclipse/leshan/transport/javacoap/request/LwM2mResponseBuilder.java b/leshan-tl-javacoap-server/src/main/java/org/eclipse/leshan/transport/javacoap/request/LwM2mResponseBuilder.java index 6bcf40ccb0..3910ac9d2d 100644 --- a/leshan-tl-javacoap-server/src/main/java/org/eclipse/leshan/transport/javacoap/request/LwM2mResponseBuilder.java +++ b/leshan-tl-javacoap-server/src/main/java/org/eclipse/leshan/transport/javacoap/request/LwM2mResponseBuilder.java @@ -79,7 +79,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.mbed.coap.packet.CoapRequest; import com.mbed.coap.packet.CoapResponse; import com.mbed.coap.packet.Code; import com.mbed.coap.packet.MediaTypes; @@ -97,26 +96,28 @@ public class LwM2mResponseBuilder implements DownlinkRe private static final Logger LOG = LoggerFactory.getLogger(LwM2mResponseBuilder.class); private LwM2mResponse lwM2mresponse; - // private final CoapRequest coapRequest; - private final String registrationId; + + private final Opaque token; // TODO HACK because know token used + private final CoapResponse coapResponse; private final String clientEndpoint; + private final String registrationId; private final LwM2mModel model; private final LwM2mDecoder decoder; private final LwM2mLinkParser linkParser; - private final Opaque token; - public LwM2mResponseBuilder(CoapRequest coapRequest, CoapResponse coapResponse, String clientEndpoint, - LwM2mModel model, LwM2mDecoder decoder, LwM2mLinkParser linkParser, String registrationId, - /* TODO HACK */ Opaque token) { - // this.coapRequest = coapRequest; + public LwM2mResponseBuilder(CoapResponse coapResponse, String clientEndpoint, LwM2mModel model, + LwM2mDecoder decoder, LwM2mLinkParser linkParser, String registrationId, /* TODO HACK */ Opaque token) { this.coapResponse = coapResponse; + this.clientEndpoint = clientEndpoint; + this.registrationId = registrationId; + this.model = model; this.decoder = decoder; this.linkParser = linkParser; + this.token = token; - this.registrationId = registrationId; } @Override