Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fhir-notification: Limit handling for 1M #1948 #2511

Merged
merged 3 commits into from
Jun 17, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions docs/src/pages/guides/FHIRServerUsersGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -557,8 +557,11 @@ Field name | Type | Description
`lastUpdated` | String | The date and time of the last update made to the resource associated with the notification event.
`resourceId` | String | The logical id of the resource associated with the notification event.
`resource` | String | A stringified JSON object which is the resource associated with the notification event.
`tenantId` | String | The tenant that generated this notification
`datasourceId` | String | The datasource used by the tenant

The following JSON is an example of a serialized notification event:

```
{
"lastUpdated":"2016-06-01T10:36:23.232-05:00",
Expand All @@ -569,6 +572,8 @@ The following JSON is an example of a serialized notification event:
}
prb112 marked this conversation as resolved.
Show resolved Hide resolved
```

If the resource is over the limit specified in `fhirServer/notifications/common/maxNotificationSizeBytes`, the default value is to subset `id`, `meta` and `resourceType` and add the subset to the FHIRNotificationEvent. In alternative configurations, user may set `fhirServer/notifications/common/maxNotificationSizeBehavior` to `omit` and subsequently retrieve the resource using the location.

### 4.2.2 WebSocket
The WebSocket implementation of the notification service will publish notification event messages to a WebSocket. To enable WebSocket notifications, set the `fhirServer/notifications/websocket/enabled` property to `true`, as in the following example:

Expand Down Expand Up @@ -630,6 +635,8 @@ Before you enable Kafka notifications, it's important to understand the topology

On the other hand, if you have two completely independent FHIR server instances, then you should configure each one with its own topic name.

The FHIRNotificationEvent is asynchronous by default. If you want to specify a synchronous request, you can set `fhirServer/notifications/kafka/sync` to true, which ensures no message is lost in publishing, however it does add latency in each request.

### 4.2.3 NATS
The [NATS](http://nats.io) implementation of the notification service publishes notification event messages to a NATS streaming cluster. To configure the NATS notification publisher, configure properties in the `fhir-server-config.json` file as shown in the following example:

Expand Down Expand Up @@ -1995,8 +2002,11 @@ This section contains reference information about each of the configuration prop
|`fhirServer/resources/<resourceType>/searchParameterCombinations`|string list|A comma-separated list of search parameter combinations supported for this resource type. Each search parameter combination is a string, where a plus sign, `+`, separates the search parameters that can be used in combination. To indicate that searching without any search parameters is allowed, an empty string must be included in the list. Including an asterisk, `*`, in the list indicates support of any search parameter combination. For resources without the property, the value of `fhirServer/resources/Resource/searchParameterCombinations` is used.|
|`fhirServer/resources/<resourceType>/profiles/atLeastOne`|string list|A comma-separated list of profiles, at least one of which must be specified in a resource's `meta.profile` element and be successfully validated against in order for a resource of this type to be persisted to the FHIR server. If this property is not specified, or if an empty list is specified, the value of `fhirServer/resources/Resource/profiles/atLeastOne` will be used.|
|`fhirServer/notifications/common/includeResourceTypes`|string list|A comma-separated list of resource types for which notification event messages should be published.|
|`fhirServer/notifications/common/maxNotificationSizeBytes`|integer|The maximum size in byte of the notification that should be sent|
|`fhirServer/notifications/common/maxNotificationSizeBehavior`|string|The behavior of the notification framework when a notification is over the maxNotificationSizeBytes. Valid values are subset and omit|
|`fhirServer/notifications/websocket/enabled`|boolean|A boolean flag which indicates whether or not websocket notifications are enabled.|
|`fhirServer/notifications/kafka/enabled`|boolean|A boolean flag which indicates whether or not kafka notifications are enabled.|
|`fhirServer/notifications/kafka/sync`|boolean|A boolean flag which indicates whether or not the FHIRNotificationEvent is sent in a synchronous mode|
|`fhirServer/notifications/kafka/topicName`|string|The name of the topic to which kafka notification event messages should be published.|
|`fhirServer/notifications/kafka/connectionProperties`|property list|A group of connection properties used to configure the KafkaProducer. These properties are used as-is when instantiating the KafkaProducer used by the FHIR server for publishing notification event messages.|
|`fhirServer/notifications/nats/enabled`|boolean|A boolean flag which indicates whether or not NATS notifications are enabled.|
Expand Down Expand Up @@ -2142,8 +2152,11 @@ This section contains reference information about each of the configuration prop
|`fhirServer/resources/<resourceType>/searchParameterCombinations`|null (inherits from `fhirServer/resources/Resource/searchParameterCombinations`)|
|`fhirServer/resources/<resourceType>/profiles/atLeastOne`|null (inherits from `fhirServer/resources/Resource/profiles/atLeastOne`)|
|`fhirServer/notifications/common/includeResourceTypes`|`["*"]`|
|`fhirServer/notifications/common/maxNotificationSizeBytes`|integer|1000000|
|`fhirServer/notifications/common/maxNotificationSizeBehavior`|string|subset|
|`fhirServer/notifications/websocket/enabled`|false|
|`fhirServer/notifications/kafka/enabled`|false|
|`fhirServer/notifications/kafka/sync`|false|
|`fhirServer/notifications/kafka/topicName`|fhirNotifications|
|`fhirServer/notifications/kafka/connectionProperties`|`{}`|
|`fhirServer/notifications/nats/enabled`|false|
Expand Down Expand Up @@ -2279,8 +2292,11 @@ must restart the server for that change to take effect.
|`fhirServer/resources/<resourceType>/searchParameterCombinations`|Y|Y|
|`fhirServer/resources/<resourceType>/profiles/atLeastOne`|Y|Y|
|`fhirServer/notifications/common/includeResourceTypes`|N|N|
|`fhirServer/notifications/common/maxNotificationSizeBytes`|Y|N|
|`fhirServer/notifications/common/maxNotificationSizeBehavior`|Y|N|
|`fhirServer/notifications/websocket/enabled`|N|N|
|`fhirServer/notifications/kafka/enabled`|N|N|
|`fhirServer/notifications/kafka/sync`|Y|N|
|`fhirServer/notifications/kafka/topicName`|N|N|
|`fhirServer/notifications/kafka/connectionProperties`|N|N|
|`fhirServer/notifications/nats/enabled`|N|N|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,13 @@ public class FHIRConfiguration {

// Notification config properties
public static final String PROPERTY_NOTIFICATION_RESOURCE_TYPES = "fhirServer/notifications/common/includeResourceTypes";
public static final String PROPERTY_NOTIFICATION_NOTIFICATION_SIZE_BEHAVIOR = "fhirServer/notifications/common/maxNotificationSizeBehavior";
public static final String PROPERTY_NOTIFICATION_MAX_SIZE = "fhirServer/notifications/common/maxNotificationSizeBytes";
public static final String PROPERTY_WEBSOCKET_ENABLED = "fhirServer/notifications/websocket/enabled";
public static final String PROPERTY_KAFKA_ENABLED = "fhirServer/notifications/kafka/enabled";
public static final String PROPERTY_KAFKA_TOPICNAME = "fhirServer/notifications/kafka/topicName";
public static final String PROPERTY_KAFKA_CONNECTIONPROPS = "fhirServer/notifications/kafka/connectionProperties";
public static final String PROPERTY_KAFKA_SYNC = "fhirServer/notifications/kafka/sync";
public static final String PROPERTY_NATS_ENABLED = "fhirServer/notifications/nats/enabled";
public static final String PROPERTY_NATS_CLUSTER = "fhirServer/notifications/nats/cluster";
public static final String PROPERTY_NATS_CHANNEL = "fhirServer/notifications/nats/channel";
Expand Down
5 changes: 5 additions & 0 deletions fhir-notification/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,10 @@
<artifactId>jakarta.ws.rs-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,20 @@
package com.ibm.fhir.notification.util;

import java.io.StringReader;
import java.util.Arrays;
import java.util.List;
import java.util.logging.Logger;

import com.ibm.fhir.config.FHIRConfigHelper;
import com.ibm.fhir.config.FHIRConfiguration;
import com.ibm.fhir.exception.FHIRException;
import com.ibm.fhir.model.format.Format;
import com.ibm.fhir.model.parser.FHIRJsonParser;
import com.ibm.fhir.model.parser.FHIRParser;
import com.ibm.fhir.model.util.FHIRUtil;
import com.ibm.fhir.model.util.JsonSupport;
import com.ibm.fhir.notification.FHIRNotificationEvent;
import com.ibm.fhir.search.SearchConstants;

import jakarta.json.Json;
import jakarta.json.JsonBuilderFactory;
Expand All @@ -16,26 +30,39 @@
import jakarta.json.JsonReader;
import jakarta.json.JsonReaderFactory;

import com.ibm.fhir.exception.FHIRException;
import com.ibm.fhir.model.util.JsonSupport;
import com.ibm.fhir.notification.FHIRNotificationEvent;

/**
* FHIRNotificationUtil supports serializing and deserializing the FHIRNotificationEvent based on conditions.
*/
public class FHIRNotificationUtil {
private static final Logger LOG = Logger.getLogger(FHIRNotificationUtil.class.getSimpleName());
private static final JsonReaderFactory JSON_READER_FACTORY = Json.createReaderFactory(null);
private static final JsonBuilderFactory JSON_BUILDER_FACTORY = Json.createBuilderFactory(null);

private static final int DEFAULT_MAX_SIZE = 1000000;

private FHIRNotificationUtil() {
// No Operation
}

/**
* serialize the FHIRNotificationEvent
*
* @param jsonString the input string
* @return FHIRNotificationEvent without the Resource
*/
public static FHIRNotificationEvent toNotificationEvent(String jsonString) {
try (JsonReader reader = JSON_READER_FACTORY.createReader(new StringReader(jsonString))) {
JsonObject jsonObject = reader.readObject();
FHIRNotificationEvent event = new FHIRNotificationEvent();
event.setOperationType(jsonObject.getString("operationType"));
event.setLocation(jsonObject.getString("location"));
event.setLastUpdated(jsonObject.getString("lastUpdated"));
event.setLocation(jsonObject.getString("location"));
event.setOperationType(jsonObject.getString("operationType"));
event.setResourceId(jsonObject.getString("resourceId"));
event.setDatasourceId(jsonObject.getString("datasourceId"));
event.setTenantId(jsonObject.getString("tenantId"));
return event;
} catch (JsonException e) {
System.out.println("Failed to parse json string: " + e.getLocalizedMessage());
LOG.warning("Failed to parse json string: " + e.getLocalizedMessage());
return null;
}
}
Expand All @@ -58,11 +85,47 @@ public static String toJsonString(FHIRNotificationEvent event, boolean includeRe
builder.add("tenantId", event.getTenantId());

// If it's a delete operation, don't add as there is no actual resource in the event.
String jsonString;
if (!"delete".equals(event.getOperationType()) && includeResource && event.getResource() != null) {
builder.add("resource", JsonSupport.toJsonObject(event.getResource()));
JsonObject jsonObject = builder.build();

jsonString = jsonObject.toString();
long length = jsonString.getBytes().length;

int maxSize = FHIRConfigHelper.getIntProperty(FHIRConfiguration.PROPERTY_NOTIFICATION_MAX_SIZE, DEFAULT_MAX_SIZE);
if (length > maxSize) {
LOG.fine(() -> event.getResource().getClass().getSimpleName() + "/" + event.getResourceId() + " is over the size limit - '" + length + "' > '" + maxSize + "'");

// If we are including a subset, we'll add here.
String subset = FHIRConfigHelper.getStringProperty(FHIRConfiguration.PROPERTY_NOTIFICATION_NOTIFICATION_SIZE_BEHAVIOR, "subset");
if ("subset".equals(subset)) {
List<String> elements = Arrays.asList("id", "meta", "resourceType");
com.ibm.fhir.model.resource.Resource resource = FHIRParser.parser(Format.JSON)
.as(FHIRJsonParser.class)
.parseAndFilter(JsonSupport.toJsonObject(event.getResource()), elements);
// add a SUBSETTED tag to this resource to indicate that its elements have been filtered
if (!FHIRUtil.hasTag(resource, SearchConstants.SUBSETTED_TAG)) {
resource = FHIRUtil.addTag(resource, SearchConstants.SUBSETTED_TAG);
}
builder.add("resource", JsonSupport.toJsonObject(resource));
} else {
LOG.fine(() -> "Omitting the resource in FHIRNotificationEvent");
}
// the build method wipes out any preexisting values, we have to add them back.
builder.add("lastUpdated", event.getLastUpdated());
builder.add("location", event.getLocation());
builder.add("operationType", event.getOperationType());
builder.add("resourceId", event.getResourceId());
builder.add("datasourceId", event.getDatasourceId());
builder.add("tenantId", event.getTenantId());
jsonObject = builder.build();
}
jsonString = jsonObject.toString();
prb112 marked this conversation as resolved.
Show resolved Hide resolved
} else {
JsonObject jsonObject = builder.build();
jsonString = jsonObject.toString();
}
JsonObject jsonObject = builder.build();
String jsonString = jsonObject.toString();
return jsonString;
}
}
Loading