Skip to content

Commit

Permalink
Feat(client): Add Telemetry APIs (#15047)
Browse files Browse the repository at this point in the history
  • Loading branch information
azabbasi authored Sep 11, 2020
1 parent 0137fec commit 2491428
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,7 @@ PagedFlux<String> listRelationships(String digitalTwinId, String relationshipNam
* @param modelId The Id of the model to decommission.
* @return an empty Mono
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Void> decommissionModel(String modelId) {
return decommissionModelWithResponse(modelId)
.flatMap(voidResponse -> Mono.empty());
Expand All @@ -947,6 +948,7 @@ public Mono<Void> decommissionModel(String modelId) {
* @param modelId The Id of the model to decommission.
* @return The http response.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<Void>> decommissionModelWithResponse(String modelId) {
return withContext(context -> decommissionModelWithResponse(modelId, context));
}
Expand Down Expand Up @@ -1338,4 +1340,106 @@ Mono<PagedResponse<EventRoute>> listEventRoutesNextPage(String nextLink, Context
}

//endregion Event Route APIs

//region Telemetry APIs

/**
* Publishes telemetry from a digital twin
* The result is then consumed by one or many destination endpoints (subscribers) defined under {@link EventRoute}
* These event routes need to be set before publishing a telemetry message, in order for the telemetry message to be consumed.
* @param digitalTwinId The Id of the digital twin.
* @param payload The application/json telemetry payload to be sent.
* @return An empty mono.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Void> publishTelemetry(String digitalTwinId, String payload) {
PublishTelemetryRequestOptions publishTelemetryRequestOptions = new PublishTelemetryRequestOptions();
return withContext(context -> publishTelemetryWithResponse(digitalTwinId, payload, publishTelemetryRequestOptions, context))
.flatMap(voidResponse -> Mono.empty());
}

/**
* Publishes telemetry from a digital twin
* The result is then consumed by one or many destination endpoints (subscribers) defined under {@link EventRoute}
* These event routes need to be set before publishing a telemetry message, in order for the telemetry message to be consumed.
* @param digitalTwinId The Id of the digital twin.
* @param payload The application/json telemetry payload to be sent.
* @param publishTelemetryRequestOptions The additional information to be used when processing a telemetry request.
* @return A {@link Response} containing an empty mono.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<Void>> publishTelemetryWithResponse(String digitalTwinId, String payload, PublishTelemetryRequestOptions publishTelemetryRequestOptions) {
return withContext(context -> publishTelemetryWithResponse(digitalTwinId, payload, publishTelemetryRequestOptions, context));
}

Mono<Response<Void>> publishTelemetryWithResponse(String digitalTwinId, String payload, PublishTelemetryRequestOptions publishTelemetryRequestOptions, Context context) {
Object payloadObject = null;
try {
payloadObject = mapper.readValue(payload, Object.class);
}
catch (JsonProcessingException e) {
logger.error("Could not parse the payload [%s]: %s", payload, e);
return Mono.error(e);
}

return protocolLayer.getDigitalTwins().sendTelemetryWithResponseAsync(
digitalTwinId,
publishTelemetryRequestOptions.getMessageId(),
payloadObject,
publishTelemetryRequestOptions.getTimestamp().toString(),
context);
}

/**
* Publishes telemetry from a digital twin's component
* The result is then consumed by one or many destination endpoints (subscribers) defined under {@link EventRoute}
* These event routes need to be set before publishing a telemetry message, in order for the telemetry message to be consumed.
* @param digitalTwinId The Id of the digital twin.
* @param componentName The name of the DTDL component.
* @param payload The application/json telemetry payload to be sent.
* @return An empty mono.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Void> publishComponentTelemetry(String digitalTwinId, String componentName, String payload) {
PublishTelemetryRequestOptions publishTelemetryRequestOptions = new PublishTelemetryRequestOptions();
return withContext(context -> publishComponentTelemetryWithResponse(digitalTwinId, componentName, payload, publishTelemetryRequestOptions, context))
.flatMap(voidResponse -> Mono.empty());
}

/**
* Publishes telemetry from a digital twin's component
* The result is then consumed by one or many destination endpoints (subscribers) defined under {@link EventRoute}
* These event routes need to be set before publishing a telemetry message, in order for the telemetry message to be consumed.
* @param digitalTwinId The Id of the digital twin.
* @param componentName The name of the DTDL component.
* @param payload The application/json telemetry payload to be sent.
* @param publishTelemetryRequestOptions The additional information to be used when processing a telemetry request.
* @return A {@link Response} containing an empty mono.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<Void>> publishComponentTelemetryWithResponse(String digitalTwinId, String componentName, String payload, PublishTelemetryRequestOptions publishTelemetryRequestOptions) {
return withContext(context -> publishComponentTelemetryWithResponse(digitalTwinId, componentName, payload, publishTelemetryRequestOptions, context));
}

Mono<Response<Void>> publishComponentTelemetryWithResponse(String digitalTwinId, String componentName, String payload, PublishTelemetryRequestOptions publishTelemetryRequestOptions, Context context) {

Object payloadObject = null;
try {
payloadObject = mapper.readValue(payload, Object.class);
}
catch (JsonProcessingException e) {
logger.error("Could not parse the payload [%s]: %s", payload, e);
return Mono.error(e);
}

return protocolLayer.getDigitalTwins().sendComponentTelemetryWithResponseAsync(
digitalTwinId,
componentName,
publishTelemetryRequestOptions.getMessageId(),
payloadObject,
publishTelemetryRequestOptions.getTimestamp().toString(),
context);
}

//endregion Telemetry APIs
}
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ public Response<Void> deleteModelWithResponse(String modelId, Context context) {
* Decommissions a model.
* @param modelId The Id of the model to decommission.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public void decommissionModel(String modelId) {
decommissionModelWithResponse(modelId, Context.NONE);
}
Expand All @@ -571,6 +572,7 @@ public void decommissionModel(String modelId) {
* @param context Additional context that is passed through the Http pipeline during the service call.
* @return The http response.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<Void> decommissionModelWithResponse(String modelId, Context context) {
return digitalTwinsAsyncClient.decommissionModelWithResponse(modelId, context).block();
}
Expand Down Expand Up @@ -728,10 +730,11 @@ public void createEventRoute(String eventRouteId, EventRoute eventRoute) {
* @param eventRouteId The id of the event route to create.
* @param eventRoute The event route to create.
* @param context Additional context that is passed through the Http pipeline during the service call.
* @return A {@link Response}.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public void createEventRouteWithResponse(String eventRouteId, EventRoute eventRoute, Context context) {
this.digitalTwinsAsyncClient.createEventRouteWithResponse(eventRouteId, eventRoute, context).block();
public Response<Void> createEventRouteWithResponse(String eventRouteId, EventRoute eventRoute, Context context) {
return this.digitalTwinsAsyncClient.createEventRouteWithResponse(eventRouteId, eventRoute, context).block();
}

/**
Expand Down Expand Up @@ -801,4 +804,66 @@ public PagedIterable<EventRoute> listEventRoutes(EventRoutesListOptions options,
}

//endregion Event Route APIs

//region Telemetry APIs

/**
* Publishes telemetry from a digital twin
* The result is then consumed by one or many destination endpoints (subscribers) defined under {@link EventRoute}
* These event routes need to be set before publishing a telemetry message, in order for the telemetry message to be consumed.
* @param digitalTwinId The Id of the digital twin.
* @param payload The application/json telemetry payload to be sent.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public void publishTelemetry(String digitalTwinId, String payload) {
PublishTelemetryRequestOptions publishTelemetryRequestOptions = new PublishTelemetryRequestOptions();
publishTelemetryWithResponse(digitalTwinId, payload, publishTelemetryRequestOptions, Context.NONE);
}

/**
* Publishes telemetry from a digital twin
* The result is then consumed by one or many destination endpoints (subscribers) defined under {@link EventRoute}
* These event routes need to be set before publishing a telemetry message, in order for the telemetry message to be consumed.
* @param digitalTwinId The Id of the digital twin.
* @param payload The application/json telemetry payload to be sent.
* @param publishTelemetryRequestOptions The additional information to be used when processing a telemetry request.
* @param context Additional context that is passed through the Http pipeline during the service call.
* @return A {@link Response}.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<Void> publishTelemetryWithResponse(String digitalTwinId, String payload, PublishTelemetryRequestOptions publishTelemetryRequestOptions, Context context) {
return digitalTwinsAsyncClient.publishTelemetryWithResponse(digitalTwinId, payload, publishTelemetryRequestOptions, context).block();
}

/**
* Publishes telemetry from a digital twin's component
* The result is then consumed by one or many destination endpoints (subscribers) defined under {@link EventRoute}
* These event routes need to be set before publishing a telemetry message, in order for the telemetry message to be consumed.
* @param digitalTwinId The Id of the digital twin.
* @param componentName The name of the DTDL component.
* @param payload The application/json telemetry payload to be sent.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public void publishComponentTelemetry(String digitalTwinId, String componentName, String payload) {
PublishTelemetryRequestOptions publishTelemetryRequestOptions = new PublishTelemetryRequestOptions();
publishComponentTelemetryWithResponse(digitalTwinId, componentName, payload, publishTelemetryRequestOptions, Context.NONE);
}

/**
* Publishes telemetry from a digital twin's component
* The result is then consumed by one or many destination endpoints (subscribers) defined under {@link EventRoute}
* These event routes need to be set before publishing a telemetry message, in order for the telemetry message to be consumed.
* @param digitalTwinId The Id of the digital twin.
* @param componentName The name of the DTDL component.
* @param payload The application/json telemetry payload to be sent.
* @param publishTelemetryRequestOptions The additional information to be used when processing a telemetry request.
* @param context Additional context that is passed through the Http pipeline during the service call.
* @return A {@link Response}.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<Void> publishComponentTelemetryWithResponse(String digitalTwinId, String componentName, String payload, PublishTelemetryRequestOptions publishTelemetryRequestOptions, Context context) {
return digitalTwinsAsyncClient.publishComponentTelemetryWithResponse(digitalTwinId, componentName, payload, publishTelemetryRequestOptions, context).block();
}

//endregion TelemetryAPIs
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.azure.digitaltwins.core.util;

import com.azure.core.annotation.Fluent;

import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.UUID;

/**
* The additional information to be used when processing a telemetry request.
*/
@Fluent
public final class PublishTelemetryRequestOptions {

/**
* A unique message identifier (within the scope of the digital twin id) that is commonly used for de-duplicating messages.
* Defaults to a random guid.
*/
private String messageId = UUID.randomUUID().toString();

/**
* An RFC 3339 timestamp that identifies the time the telemetry was measured.
* It defaults to the current date/time UTC.
*/
private OffsetDateTime timestamp = OffsetDateTime.now(ZoneOffset.UTC);

/**
* Gets the message Id.
* @return A unique message identifier (within the scope of the digital twin id) that is commonly used for de-duplicating messages.
*/
public String getMessageId() {
return this.messageId;
}

/**
* Gets the timestamp.
* @return The timestamp that identifies the time the telemetry was measured.
*/
public OffsetDateTime getTimestamp() {
return this.timestamp;
}

/**
* Set the message Id
* @param messageId A unique message identifier (within the scope of the digital twin id) that is commonly used for de-duplicating messages.
* @return The PublishTelemetryRequestOptions object itself.
*/
public PublishTelemetryRequestOptions setMessageId(String messageId) {
this.messageId = messageId;
return this;
}

/**
* Set the timestamp
* @param timestamp The timestamp that identifies the time the telemetry was measured.
* @return The PublishTelemetryRequestOptions object itself.
*/
public PublishTelemetryRequestOptions setTimestamp(OffsetDateTime timestamp) {
this.timestamp = timestamp;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void componentLifecycleTest(HttpClient httpClient, DigitalTwinsServiceVer
StepVerifier.create(asyncClient.updateComponentWithResponse(roomWithWifiTwinId, wifiComponentName, TestAssetsHelper.getWifiComponentUpdatePayload(), new UpdateComponentRequestOptions()))
.assertNext(updateResponse -> {
assertEquals(updateResponse.getStatusCode(), HttpURLConnection.HTTP_NO_CONTENT);
logger.info("Updated component successfully");
logger.info("Updated the component successfully");
})
.verifyComplete();
}
Expand Down

0 comments on commit 2491428

Please sign in to comment.