Skip to content

Commit

Permalink
fix: retain null fields in sync node merger (#201)
Browse files Browse the repository at this point in the history
  • Loading branch information
sameerzuberi authored Feb 1, 2024
1 parent 1e4f09f commit ef1ceeb
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.aws.greengrass.shadowmanager.ShadowManagerDAOImpl;
import com.aws.greengrass.shadowmanager.ShadowManagerDatabase;
import com.aws.greengrass.shadowmanager.exception.RetryableException;
import com.aws.greengrass.shadowmanager.model.dao.SyncInformation;
import com.aws.greengrass.shadowmanager.sync.IotDataPlaneClientFactory;
import com.aws.greengrass.shadowmanager.sync.RequestBlockingQueue;
import com.aws.greengrass.shadowmanager.sync.Retryer;
Expand All @@ -31,6 +32,7 @@
import org.mockito.Answers;
import org.mockito.Mock;

import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Collections;
Expand Down Expand Up @@ -81,7 +83,7 @@ void startNucleusWithConfig(String configFile, State expectedState, boolean mock
startNucleusWithConfig(configFile, expectedState, mockDatabase, false, true);
}

void startNucleusWithConfig(NucleusLaunchUtilsConfig config) throws InterruptedException {
private CountDownLatch setup(NucleusLaunchUtilsConfig config) {
CountDownLatch shadowManagerRunning = new CountDownLatch(1);
AtomicBoolean isSyncMocked = new AtomicBoolean(false);
kernel.parseArgs("-r", rootDir.toAbsolutePath().toString(), "-i",
Expand Down Expand Up @@ -129,13 +131,10 @@ void startNucleusWithConfig(NucleusLaunchUtilsConfig config) throws InterruptedE
.maxRetryInterval(Duration.ofSeconds(1))
.retryableExceptions(Collections.singletonList(RetryableException.class))
.build();
Retryer retryer = (retryConfig1, request, context) ->
RetryUtils.runWithRetry(retryConfig,
() -> {
Retryer retryer = (retryConfig1, request, context) -> RetryUtils.runWithRetry(retryConfig, () -> {
request.execute(context);
return null;
},
"test-setup", LogManager.getLogger(getClass()));
}, "test-setup", LogManager.getLogger(getClass()));
SyncHandler.setRetryer(retryer);
SyncStrategy syncStrategy;
if (RealTimeSyncStrategy.class.equals(config.getSyncClazz())) {
Expand All @@ -149,6 +148,29 @@ void startNucleusWithConfig(NucleusLaunchUtilsConfig config) throws InterruptedE
syncHandler.setOverallSyncStrategy(syncStrategy);
isSyncMocked.set(true);
}
return shadowManagerRunning;
}

void startNucleusWithConfig(NucleusLaunchUtilsConfig config) throws InterruptedException {
CountDownLatch shadowManagerRunning = setup(config);
kernel.launch();
assertTrue(shadowManagerRunning.await(TEST_TIME_OUT_SEC, TimeUnit.SECONDS));
}

void startNucleusWithConfigAndLocalShadowState(NucleusLaunchUtilsConfig config, String thingName,
String shadowName, String localShadowState)
throws InterruptedException {
CountDownLatch shadowManagerRunning = setup(config);

kernel.getContext().get(ShadowManagerDatabase.class).install();
ShadowManagerDAOImpl dao = kernel.getContext().get(ShadowManagerDAOImpl.class);
dao.updateShadowThing(thingName, shadowName, localShadowState.getBytes(StandardCharsets.UTF_8), 0);
dao.updateSyncInformation(SyncInformation.builder()
.thingName(thingName)
.shadowName(shadowName)
.lastSyncedDocument(localShadowState.getBytes(StandardCharsets.UTF_8))
.localVersion(0)
.build());

kernel.launch();
assertTrue(shadowManagerRunning.await(TEST_TIME_OUT_SEC, TimeUnit.SECONDS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1516,6 +1516,40 @@ void GIVEN_synced_shadow_WHEN_cloud_cleared_THEN_local_cleared(Class<?extends Ba
assertLocalShadowEquals("{\"state\":{}}");
}

@ParameterizedTest
@ValueSource(classes = {RealTimeSyncStrategy.class, PeriodicSyncStrategy.class})
void GIVEN_cloud_shadow_with_deleted_field_and_outdated_local_shadow_WHEN_full_sync_THEN_local_matches_cloud(Class<? extends BaseSyncStrategy> clazz, ExtensionContext context)
throws IoTDataPlaneClientCreationException, InterruptedException, IOException {
ignoreExceptionOfType(context, InterruptedException.class);
ignoreExceptionOfType(context, ConflictError.class);

String initialLocalState = "{\"state\":{\"desired\":{\"OtherKey\":\"foo\"}}}";
String initialCloudState = "{\"version\":1,\"state\":{\"desired\":{\"SomeKey\":\"foo\"}}}}";
String expectedLocalShadowState = "{\"state\":{\"desired\":{\"SomeKey\":\"foo\"}}}";

when(iotDataPlaneClientFactory.getIotDataPlaneClient().updateThingShadow(cloudUpdateThingShadowRequestCaptor.capture()))
.thenReturn(mockUpdateThingShadowResponse);

// setup initial cloud state
GetThingShadowResponse initialCloudStateShadowResponse = mock(GetThingShadowResponse.class, Answers.RETURNS_DEEP_STUBS);
lenient().when(initialCloudStateShadowResponse.payload().asByteArray()).thenReturn(initialCloudState.getBytes(UTF_8));
when(iotDataPlaneClientFactory.getIotDataPlaneClient().getThingShadow(any(GetThingShadowRequest.class))).thenReturn(initialCloudStateShadowResponse);

// start Nucleus with a local shadow preset
NucleusLaunchUtilsConfig config = NucleusLaunchUtilsConfig.builder()
.configFile(getSyncConfigFile(clazz))
.syncClazz(clazz)
.mockCloud(true)
.build();
startNucleusWithConfigAndLocalShadowState(config, MOCK_THING_NAME_1, CLASSIC_SHADOW, initialLocalState);

// wait for initial full sync to complete
verify(syncQueue, after(7000).atMost(5)).put(any(FullShadowSyncRequest.class));
assertEmptySyncQueue(clazz);
assertThat("sync info exists", () -> syncInfo.get().isPresent(), eventuallyEval(is(true)));
assertLocalShadowEquals(expectedLocalShadowState);
}

private void mockCloudUpdateResponsesWithIncreasingVersions() throws IoTDataPlaneClientCreationException {
when(iotDataPlaneClientFactory.getIotDataPlaneClient()
.updateThingShadow(cloudUpdateThingShadowRequestCaptor.capture()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,13 @@ && isDocVersionSame(localShadowDocument.get(), syncInformation, DataOwner.LOCAL)
long cloudDocumentVersion = cloudShadowDocument.get().getVersion();

// If the cloud document version is different from the last sync, that means the local document needed
// some updates. So we go ahead an update the local shadow document.
// some updates. So we go ahead and update the local shadow document.
if (!isDocVersionSame(cloudShadowDocument.get(), syncInformation, DataOwner.CLOUD)) {
localDocumentVersion = updateLocalDocumentAndGetUpdatedVersion(updateDocument,
Optional.of(localDocumentVersion));
}
// If the local document version is different from the last sync, that means the cloud document needed
// some updates. So we go ahead an update the cloud shadow document.
// some updates. So we go ahead and update the cloud shadow document.
if (!isDocVersionSame(localShadowDocument.get(), syncInformation, DataOwner.LOCAL)) {
cloudDocumentVersion = updateCloudDocumentAndGetUpdatedVersion(updateDocument,
Optional.of(cloudDocumentVersion));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ private JsonMerger() {

/**
* Merges the patch JSON node to the existing source JSON node. If the node already exists in the source, then
* it replaces it. If the node is an array, then the the source array's contents are overwritten with the contents
* it replaces it. If the node is an array, then the source array's contents are overwritten with the contents
* from the patch.
*
* @param source The source JSON to merge.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,10 @@ private static void iterateOverAllUnvisitedFields(JsonNode local, JsonNode cloud
final JsonNode cloudValue = cloud.get(field);
final JsonNode baseValue = base.get(field);
JsonNode mergedResult = getMergedNode(localValue, cloudValue, baseValue, owner);
if (mergedResult != null) {
((ObjectNode) result).set(field, mergedResult);
((ObjectNode) result).set(field, mergedResult);
if (mergedResult == null) {
visited.add(field);
}
visited.add(field);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ class FullShadowSyncRequestTest {
private static final byte[] CLOUD_DOCUMENT_WITH_METADATA = "{\"version\": 5, \"state\": {\"reported\": {\"name\": \"The Beatles\", \"OldField\": true}, \"desired\": {\"name\": \"Backstreet Boys\", \"SomeOtherThingNew\": 100}}, \"metadata\": {\"reported\": {\"name\": {\"timestamp\": 100}, \"OldField\": {\"timestamp\": 100}}, \"desired\": {\"name\": {\"timestamp\": 100}, \"SomeOtherThingNew\": {\"timestamp\": 100}}}}".getBytes();
private static final byte[] CLOUD_DOCUMENT_WITH_DELTA = "{\"version\": 5, \"state\": {\"reported\": {\"name\": \"The Beatles\", \"OldField\": true}, \"desired\": {\"name\": \"Backstreet Boys\", \"SomeOtherThingNew\": 100}, \"delta\": {\"name\": \"The Beatles\", \"OldField\": true}}}".getBytes();
private static final byte[] BASE_DOCUMENT = "{\"version\": 1, \"state\": {\"reported\": {\"name\": \"The Beatles\", \"OldField\": true}, \"desired\": {\"name\": \"The Beatles\"}}}".getBytes();
private static final byte[] MERGED_DOCUMENT = "{\"state\": {\"reported\": {\"name\": \"The Beach Boys\", \"NewField\": 100}, \"desired\": {\"name\": \"Backstreet Boys\", \"SomethingNew\": true, \"SomeOtherThingNew\": 100}}}".getBytes();
// TODO: Refactor class so the null "OldField" can be removed from the merged document. This is present because
// SyncNodeMerger will set removed fields as null, which are then handled by UpdateThingShadowRequestHandler,
// which currently has mocked behavior resulting in null fields not being removed within these tests.
private static final byte[] MERGED_DOCUMENT = "{\"state\": {\"reported\": {\"name\": \"The Beach Boys\", \"NewField\": 100, \"OldField\": null}, \"desired\": {\"name\": \"Backstreet Boys\", \"SomethingNew\": true, \"SomeOtherThingNew\": 100}}}".getBytes();
private static final byte[] BAD_DOCUMENT = "{\"version\": true}".getBytes();

@Mock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ class SyncNodeMergerTest {
private static final byte[] CLOUD_DOCUMENT = "{\"name\": \"The Beatles\", \"temperature\": 80, \"OldField\": true}".getBytes();
private static final byte[] CLOUD_DOCUMENT_CHANGED = "{\"name\": \"Pink Floyd\", \"temperature\": 60, \"OldField\": true, \"SomeOtherThingNew\": 100}".getBytes();
private static final byte[] BASE_DOCUMENT = "{\"name\": \"The Beatles\", \"temperature\": 70, \"OldField\": true}".getBytes();
private static final byte[] MERGED_DOCUMENT = "{\"name\":\"The Beach Boys\", \"temperature\": 80, \"NewField\":100}".getBytes();
private static final byte[] MERGED_DOCUMENT_WITH_CLOUD_CHANGED_AND_CLOUD_OWNER = "{\"name\":\"Pink Floyd\", \"temperature\": 60,\"NewField\":100,\"SomeOtherThingNew\":100}".getBytes();
private static final byte[] MERGED_DOCUMENT_WITH_CLOUD_CHANGED_AND_LOCAL_OWNER = "{\"name\":\"The Beach Boys\", \"temperature\": 80,\"NewField\":100,\"SomeOtherThingNew\":100}".getBytes();
private static final byte[] MERGED_DOCUMENT = "{\"name\":\"The Beach Boys\", \"temperature\": 80, \"NewField\":100, \"OldField\":null}".getBytes();
private static final byte[] MERGED_DOCUMENT_WITH_CLOUD_CHANGED_AND_CLOUD_OWNER = "{\"name\":\"Pink Floyd\", \"temperature\": 60,\"NewField\":100,\"OldField\":null,\"SomeOtherThingNew\":100}".getBytes();
private static final byte[] MERGED_DOCUMENT_WITH_CLOUD_CHANGED_AND_LOCAL_OWNER = "{\"name\":\"The Beach Boys\", \"temperature\": 80,\"NewField\":100,\"OldField\":null,\"SomeOtherThingNew\":100}".getBytes();

private final static byte[] LOCAL_WITH_ARRAY_STRING = "{\"id\": 100, \"SomeArrayKey\": [\"SomeValue1\", \"SomeValue2\"]}".getBytes();
private final static byte[] CLOUD_WITH_ARRAY_STRING = "{\"id\": 100, \"SomeArrayKey\": [\"SomeValue3\", \"SomeValue4\"]}".getBytes();
Expand Down

0 comments on commit ef1ceeb

Please sign in to comment.